This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new c55f30e HDDS-6412: EC: Handle ECReplicationConfig in
initiateMultipartUpload API (#3184)
c55f30e is described below
commit c55f30edf0fd36746c950761dbc3ff76f6d1c785
Author: Uma Maheswara Rao G <[email protected]>
AuthorDate: Mon Mar 28 13:55:49 2022 -0700
HDDS-6412: EC: Handle ECReplicationConfig in initiateMultipartUpload API
(#3184)
---
...OzoneManagerProtocolClientSideTranslatorPB.java | 10 +++--
.../org/apache/hadoop/ozone/om/TestOmMetrics.java | 48 ++++++++++++++--------
.../hadoop/ozone/s3/endpoint/ObjectEndpoint.java | 48 ++++++++++------------
.../hadoop/ozone/client/OzoneBucketStub.java | 8 ++++
.../s3/endpoint/TestInitiateMultipartUpload.java | 35 ++++++++++++++--
.../ozone/s3/endpoint/TestPermissionCheck.java | 3 +-
6 files changed, 99 insertions(+), 53 deletions(-)
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 6f381d0..b1ef8fe 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -963,9 +963,13 @@ public final class
OzoneManagerProtocolClientSideTranslatorPB
OzoneAcl.toProtobuf(a)).collect(Collectors.toList()));
if (omKeyArgs.getReplicationConfig() != null) {
- keyArgs.setFactor(
- ReplicationConfig
- .getLegacyFactor(omKeyArgs.getReplicationConfig()));
+ if (omKeyArgs.getReplicationConfig() instanceof ECReplicationConfig) {
+ keyArgs.setEcReplicationConfig(
+ ((ECReplicationConfig)
omKeyArgs.getReplicationConfig()).toProto());
+ } else {
+ keyArgs.setFactor(ReplicationConfig
+ .getLegacyFactor(omKeyArgs.getReplicationConfig()));
+ }
keyArgs.setType(omKeyArgs.getReplicationConfig().getReplicationType());
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
index 7ff190a..c9babb8 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
@@ -34,7 +34,9 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ContainerBlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.HddsWhiteboxTestUtils;
import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
@@ -253,7 +255,7 @@ public class TestOmMetrics {
@Test
public void testKeyOps() throws Exception {
// This test needs a cluster with DNs and SCM to wait on safemode
- clusterBuilder.setNumDatanodes(3);
+ clusterBuilder.setNumDatanodes(5);
conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED, true);
startCluster();
String volumeName = UUID.randomUUID().toString();
@@ -262,7 +264,8 @@ public class TestOmMetrics {
.getInternalState(ozoneManager, "keyManager");
KeyManager mockKm = Mockito.spy(keyManager);
TestDataUtil.createVolumeAndBucket(cluster, volumeName, bucketName);
- OmKeyArgs keyArgs = createKeyArgs(volumeName, bucketName);
+ OmKeyArgs keyArgs = createKeyArgs(volumeName, bucketName,
+
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
doKeyOps(keyArgs);
MetricsRecordBuilder omMetrics = getMetrics("OMMetrics");
@@ -275,18 +278,26 @@ public class TestOmMetrics {
assertCounter("NumKeys", 0L, omMetrics);
assertCounter("NumInitiateMultipartUploads", 1L, omMetrics);
- keyArgs = createKeyArgs(volumeName, bucketName);
+ keyArgs = createKeyArgs(volumeName, bucketName,
+ new ECReplicationConfig("rs-3-2-1024K"));
+ doKeyOps(keyArgs);
+ omMetrics = getMetrics("OMMetrics");
+ assertCounter("NumKeyOps", 14L, omMetrics);
+
+ keyArgs = createKeyArgs(volumeName, bucketName,
+
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
OpenKeySession keySession = writeClient.openKey(keyArgs);
writeClient.commitKey(keyArgs, keySession.getId());
- keyArgs = createKeyArgs(volumeName, bucketName);
+ keyArgs = createKeyArgs(volumeName, bucketName,
+
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
keySession = writeClient.openKey(keyArgs);
writeClient.commitKey(keyArgs, keySession.getId());
- keyArgs = createKeyArgs(volumeName, bucketName);
+ keyArgs = createKeyArgs(volumeName, bucketName,
+
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
keySession = writeClient.openKey(keyArgs);
writeClient.commitKey(keyArgs, keySession.getId());
writeClient.deleteKey(keyArgs);
-
omMetrics = getMetrics("OMMetrics");
assertCounter("NumKeys", 2L, omMetrics);
@@ -301,17 +312,18 @@ public class TestOmMetrics {
// inject exception to test for Failure Metrics on the write path
mockWritePathExceptions(OmBucketInfo.class);
- keyArgs = createKeyArgs(volumeName, bucketName);
+ keyArgs = createKeyArgs(volumeName, bucketName,
+
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
doKeyOps(keyArgs);
omMetrics = getMetrics("OMMetrics");
- assertCounter("NumKeyOps", 21L, omMetrics);
- assertCounter("NumKeyAllocate", 5L, omMetrics);
- assertCounter("NumKeyLookup", 2L, omMetrics);
- assertCounter("NumKeyDeletes", 3L, omMetrics);
- assertCounter("NumKeyLists", 2L, omMetrics);
- assertCounter("NumTrashKeyLists", 2L, omMetrics);
- assertCounter("NumInitiateMultipartUploads", 2L, omMetrics);
+ assertCounter("NumKeyOps", 28L, omMetrics);
+ assertCounter("NumKeyAllocate", 6L, omMetrics);
+ assertCounter("NumKeyLookup", 3L, omMetrics);
+ assertCounter("NumKeyDeletes", 4L, omMetrics);
+ assertCounter("NumKeyLists", 3L, omMetrics);
+ assertCounter("NumTrashKeyLists", 3L, omMetrics);
+ assertCounter("NumInitiateMultipartUploads", 3L, omMetrics);
assertCounter("NumKeyAllocateFails", 1L, omMetrics);
assertCounter("NumKeyLookupFails", 1L, omMetrics);
@@ -556,8 +568,8 @@ public class TestOmMetrics {
}
}
- private OmKeyArgs createKeyArgs(String volumeName, String bucketName)
- throws IOException {
+ private OmKeyArgs createKeyArgs(String volumeName, String bucketName,
+ ReplicationConfig repConfig) throws IOException {
OmKeyLocationInfo keyLocationInfo = new OmKeyLocationInfo.Builder()
.setBlockID(new BlockID(new ContainerBlockID(1, 1)))
.setPipeline(MockPipeline.createSingleNodePipeline())
@@ -571,10 +583,10 @@ public class TestOmMetrics {
.setBucketName(bucketName)
.setKeyName(keyName)
.setAcls(Lists.emptyList())
- .setReplicationConfig(RatisReplicationConfig.getInstance(
- HddsProtos.ReplicationFactor.THREE))
+ .setReplicationConfig(repConfig)
.build();
}
+
private OmVolumeArgs createVolumeArgs() {
String volumeName = UUID.randomUUID().toString();
return new OmVolumeArgs.Builder()
diff --git
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index 99a08d6..d048550 100644
---
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -53,8 +53,8 @@ import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
@@ -178,24 +178,12 @@ public class ObjectEndpoint extends EndpointBase {
try {
copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER);
storageType = headers.getHeaderString(STORAGE_CLASS_HEADER);
+ boolean storageTypeDefault = StringUtils.isEmpty(storageType);
// Normal put object
OzoneBucket bucket = getBucket(bucketName);
- ReplicationConfig clientConfiguredReplicationConfig = null;
- String replication = ozoneConfiguration.get(OZONE_REPLICATION);
- if (replication != null) {
- clientConfiguredReplicationConfig = ReplicationConfig.parse(
- ReplicationType.valueOf(ozoneConfiguration
- .get(OZONE_REPLICATION_TYPE, OZONE_REPLICATION_TYPE_DEFAULT)),
- replication, ozoneConfiguration);
- }
- ReplicationConfig replicationConfig = S3Utils
- .resolveS3ClientSideReplicationConfig(storageType,
- clientConfiguredReplicationConfig,
bucket.getReplicationConfig());
- boolean storageTypeDefault = false;
- if (storageType == null || storageType.equals("")) {
- storageTypeDefault = true;
- }
+ ReplicationConfig replicationConfig =
+ getReplicationConfig(bucket, storageType);
if (copyHeader != null) {
//Copy object, as copy source available.
@@ -498,17 +486,11 @@ public class ObjectEndpoint extends EndpointBase {
OzoneBucket ozoneBucket = getBucket(bucket);
String storageType = headers.getHeaderString(STORAGE_CLASS_HEADER);
- S3StorageType s3StorageType;
- if (storageType == null || storageType.equals("")) {
- s3StorageType = S3StorageType.getDefault(ozoneConfiguration);
- } else {
- s3StorageType = S3Utils.toS3StorageType(storageType);
- }
- ReplicationType replicationType = s3StorageType.getType();
- ReplicationFactor replicationFactor = s3StorageType.getFactor();
+ ReplicationConfig replicationConfig =
+ getReplicationConfig(ozoneBucket, storageType);
- OmMultipartInfo multipartInfo = ozoneBucket
- .initiateMultipartUpload(key, replicationType, replicationFactor);
+ OmMultipartInfo multipartInfo =
+ ozoneBucket.initiateMultipartUpload(key, replicationConfig);
MultipartUploadInitiateResponse multipartUploadInitiateResponse = new
MultipartUploadInitiateResponse();
@@ -531,6 +513,20 @@ public class ObjectEndpoint extends EndpointBase {
}
}
+ private ReplicationConfig getReplicationConfig(OzoneBucket ozoneBucket,
+ String storageType) throws OS3Exception {
+ ReplicationConfig clientConfiguredReplicationConfig = null;
+ String replication = ozoneConfiguration.get(OZONE_REPLICATION);
+ if (replication != null) {
+ clientConfiguredReplicationConfig = ReplicationConfig.parse(
+ ReplicationType.valueOf(ozoneConfiguration
+ .get(OZONE_REPLICATION_TYPE, OZONE_REPLICATION_TYPE_DEFAULT)),
+ replication, ozoneConfiguration);
+ }
+ return S3Utils.resolveS3ClientSideReplicationConfig(storageType,
+ clientConfiguredReplicationConfig, ozoneBucket.getReplicationConfig());
+ }
+
/**
* Complete a multipart upload.
*/
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
index fcddfde..f4b5846 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
@@ -228,6 +228,14 @@ public class OzoneBucketStub extends OzoneBucket {
}
@Override
+ public OmMultipartInfo initiateMultipartUpload(String keyName,
+ ReplicationConfig repConfig) throws IOException {
+ String uploadID = UUID.randomUUID().toString();
+ multipartUploadIdMap.put(keyName, uploadID);
+ return new OmMultipartInfo(getVolumeName(), getName(), keyName, uploadID);
+ }
+
+ @Override
public OzoneOutputStream createMultipartKey(String key, long size,
int partNumber, String uploadID)
throws IOException {
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestInitiateMultipartUpload.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestInitiateMultipartUpload.java
index 2fa396f..8bd0b84d 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestInitiateMultipartUpload.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestInitiateMultipartUpload.java
@@ -20,10 +20,12 @@
package org.apache.hadoop.ozone.s3.endpoint;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientStub;
+import org.jetbrains.annotations.NotNull;
import org.junit.Test;
import org.mockito.Mockito;
@@ -53,10 +55,7 @@ public class TestInitiateMultipartUpload {
when(headers.getHeaderString(STORAGE_CLASS_HEADER)).thenReturn(
"STANDARD");
- ObjectEndpoint rest = new ObjectEndpoint();
- rest.setHeaders(headers);
- rest.setClient(client);
- rest.setOzoneConfiguration(new OzoneConfiguration());
+ ObjectEndpoint rest = getObjectEndpoint(client, headers);
Response response = rest.initializeMultipartUpload(bucket, key);
@@ -74,4 +73,32 @@ public class TestInitiateMultipartUpload {
assertNotNull(multipartUploadInitiateResponse.getUploadID());
assertNotEquals(multipartUploadInitiateResponse.getUploadID(), uploadID);
}
+
+ @Test
+ public void testInitiateMultipartUploadWithECKey() throws Exception {
+ String bucket = OzoneConsts.S3_BUCKET;
+ String key = OzoneConsts.KEY;
+ OzoneClient client = new OzoneClientStub();
+ client.getObjectStore().createS3Bucket(bucket);
+ HttpHeaders headers = Mockito.mock(HttpHeaders.class);
+ ObjectEndpoint rest = getObjectEndpoint(client, headers);
+ client.getObjectStore().getS3Bucket(bucket)
+ .setReplicationConfig(new ECReplicationConfig("rs-3-2-1024K"));
+ Response response = rest.initializeMultipartUpload(bucket, key);
+
+ assertEquals(200, response.getStatus());
+ MultipartUploadInitiateResponse multipartUploadInitiateResponse =
+ (MultipartUploadInitiateResponse) response.getEntity();
+ assertNotNull(multipartUploadInitiateResponse.getUploadID());
+ }
+
+ @NotNull
+ private ObjectEndpoint getObjectEndpoint(OzoneClient client,
+ HttpHeaders headers) {
+ ObjectEndpoint rest = new ObjectEndpoint();
+ rest.setHeaders(headers);
+ rest.setClient(client);
+ rest.setOzoneConfiguration(new OzoneConfiguration());
+ return rest;
+ }
}
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPermissionCheck.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPermissionCheck.java
index 4aa5ef1..3f0f21b 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPermissionCheck.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPermissionCheck.java
@@ -304,8 +304,7 @@ public class TestPermissionCheck {
@Test
public void testMultiUploadKey() throws IOException {
Mockito.when(objectStore.getS3Bucket(anyString())).thenReturn(bucket);
- doThrow(exception).when(bucket)
- .initiateMultipartUpload(anyString(), any(), any());
+ doThrow(exception).when(bucket).initiateMultipartUpload(anyString(),
any());
ObjectEndpoint objectEndpoint = new ObjectEndpoint();
objectEndpoint.setClient(client);
objectEndpoint.setHeaders(headers);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]