This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new d6dc278 EC: Make ECReplicationConfig stored as bucket level
attributes. (#2401)
d6dc278 is described below
commit d6dc2781952bfb0e971668ca36308064fc8e5314
Author: Uma Maheswara Rao G <[email protected]>
AuthorDate: Thu Aug 26 08:41:07 2021 -0700
EC: Make ECReplicationConfig stored as bucket level attributes. (#2401)
---
.../hdds/client/DefaultReplicationConfig.java | 110 +++++++++++++++++++++
.../hadoop/hdds/client/ReplicationFactor.java | 16 +++
.../apache/hadoop/hdds/client/ReplicationType.java | 20 ++++
.../common/src/main/resources/ozone-default.xml | 14 +--
.../hadoop/hdds/client/TestReplicationConfig.java | 7 +-
.../interface-client/src/main/proto/hdds.proto | 8 ++
.../scm/pipeline/TestRatisPipelineProvider.java | 6 ++
.../org/apache/hadoop/ozone/client/BucketArgs.java | 26 ++++-
.../apache/hadoop/ozone/client/OzoneBucket.java | 41 +++++++-
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 16 ++-
.../hadoop/ozone/client/MockOmTransport.java | 110 +++++++++++++++++----
.../hadoop/ozone/client/TestOzoneECClient.java | 39 ++++++--
.../org/apache/hadoop/ozone/om/OMConfigKeys.java | 12 +++
.../hadoop/ozone/om/helpers/OmBucketInfo.java | 71 +++++++++++--
.../apache/hadoop/ozone/protocolPB/OMPBHelper.java | 54 ++++++++++
.../hadoop/ozone/om/helpers/TestOmBucketInfo.java | 52 ++++++++++
.../hadoop/ozone/TestOzoneConfigurationFields.java | 6 ++
.../ozone/client/rpc/TestECKeyOutputStream.java | 90 ++++++++++++++++-
.../org/apache/hadoop/ozone/om/TestOmAcls.java | 9 +-
.../src/main/proto/OmClientProtocol.proto | 6 +-
.../apache/hadoop/ozone/om/OzoneConfigUtil.java | 60 +++++++++++
.../org/apache/hadoop/ozone/om/OzoneManager.java | 23 ++++-
.../om/request/file/OMDirectoryCreateRequest.java | 7 +-
.../ozone/om/request/file/OMFileCreateRequest.java | 26 +++--
.../request/file/OMFileCreateRequestWithFSO.java | 15 ++-
.../om/request/key/OMAllocateBlockRequest.java | 3 +-
.../ozone/om/request/key/OMKeyCreateRequest.java | 20 +++-
.../om/request/key/OMKeyCreateRequestWithFSO.java | 15 ++-
.../hadoop/ozone/om/request/key/OMKeyRequest.java | 16 ++-
.../S3InitiateMultipartUploadRequest.java | 15 +--
.../S3InitiateMultipartUploadRequestWithFSO.java | 13 ++-
.../ozone/om/request/key/TestOMKeyRequest.java | 3 +
.../s3/multipart/TestS3MultipartRequest.java | 8 ++
.../ozone/om/response/TestCleanupTableInfo.java | 8 ++
.../fs/ozone/BasicOzoneClientAdapterImpl.java | 15 ++-
.../ozone/BasicRootedOzoneClientAdapterImpl.java | 15 ++-
36 files changed, 872 insertions(+), 103 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/DefaultReplicationConfig.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/DefaultReplicationConfig.java
new file mode 100644
index 0000000..a11b7fc
--- /dev/null
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/DefaultReplicationConfig.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.client;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+
+import java.util.Objects;
+
+/**
+ * Replication configuration for EC replication.
+ */
+public class DefaultReplicationConfig {
+
+ private ReplicationType type;
+ private ReplicationFactor factor;
+ private ECReplicationConfig ecReplicationConfig;
+
+ public DefaultReplicationConfig(ReplicationType type,
+ ReplicationFactor factor) {
+ this.type = type;
+ this.factor = factor;
+ this.ecReplicationConfig = null;
+ }
+
+ public DefaultReplicationConfig(ReplicationType type,
+ ECReplicationConfig ecReplicationConfig) {
+ this.type = type;
+ this.factor = null;
+ this.ecReplicationConfig = ecReplicationConfig;
+ }
+
+ public DefaultReplicationConfig(ReplicationType type,
+ ReplicationFactor factor, ECReplicationConfig ecReplicationConfig) {
+ this.type = type;
+ this.factor = factor;
+ this.ecReplicationConfig = ecReplicationConfig;
+ }
+
+ public ReplicationType getType() {
+ return this.type;
+ }
+
+ public ReplicationFactor getFactor() {
+ return this.factor;
+ }
+
+ public DefaultReplicationConfig copy() {
+ return new DefaultReplicationConfig(this.type, this.factor,
+ this.ecReplicationConfig);
+ }
+
+ public ECReplicationConfig getEcReplicationConfig() {
+ return this.ecReplicationConfig;
+ }
+
+ public int getRequiredNodes() {
+ if(this.type == ReplicationType.EC){
+ return ecReplicationConfig.getRequiredNodes();
+ }
+ return this.factor.getValue();
+ }
+
+ public HddsProtos.DefaultReplicationConfig toProto() {
+ final HddsProtos.DefaultReplicationConfig.Builder builder =
+ HddsProtos.DefaultReplicationConfig.newBuilder()
+ .setType(ReplicationType.toProto(this.type));
+ if (this.factor != null) {
+ builder.setFactor(ReplicationFactor.toProto(this.factor));
+ }
+ if (this.ecReplicationConfig != null) {
+ builder.setEcReplicationConfig(this.ecReplicationConfig.toProto());
+ }
+ return builder.build();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DefaultReplicationConfig that = (DefaultReplicationConfig) o;
+ return Objects.equals(type, that.type) && Objects
+ .equals(factor, that.factor) && Objects
+ .equals(ecReplicationConfig, ecReplicationConfig);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(type, factor, ecReplicationConfig);
+ }
+}
+
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationFactor.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationFactor.java
index 044bd6f..6cb16a4 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationFactor.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationFactor.java
@@ -71,6 +71,22 @@ public enum ReplicationFactor {
}
}
+ public static HddsProtos.ReplicationFactor toProto(
+ ReplicationFactor replicationFactor) {
+ if (replicationFactor == null) {
+ return null;
+ }
+ switch (replicationFactor) {
+ case ONE:
+ return HddsProtos.ReplicationFactor.ONE;
+ case THREE:
+ return HddsProtos.ReplicationFactor.THREE;
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported ProtoBuf replication factor: " + replicationFactor);
+ }
+ }
+
/**
* Returns integer representation of ReplicationFactor.
* @return replication value
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationType.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationType.java
index 83f04bb..64969ea 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationType.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationType.java
@@ -48,4 +48,24 @@ public enum ReplicationType {
"Unsupported ProtoBuf replication type: " + replicationType);
}
}
+
+ public static HddsProtos.ReplicationType toProto(
+ ReplicationType replicationType) {
+ if (replicationType == null) {
+ return null;
+ }
+ switch (replicationType) {
+ case RATIS:
+ return HddsProtos.ReplicationType.RATIS;
+ case STAND_ALONE:
+ return HddsProtos.ReplicationType.STAND_ALONE;
+ case CHAINED:
+ return HddsProtos.ReplicationType.CHAINED;
+ case EC:
+ return HddsProtos.ReplicationType.EC;
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported replication type: " + replicationType);
+ }
+ }
}
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index eacd12a..c26655f 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1173,24 +1173,26 @@
</property>
<property>
- <name>ozone.replication</name>
+ <name>ozone.server.default.replication</name>
<value>3</value>
- <tag>OZONE, CLIENT</tag>
+ <tag>OZONE</tag>
<description>
Default replication value. The actual number of replications can be
specified when writing the key. The default is used if replication
- is not specified. Supported values: 1 and 3.
+ is not specified when creating key or no default replication set at
+ bucket. Supported values: 1, 3 and EC_3_2.
</description>
</property>
<property>
- <name>ozone.replication.type</name>
+ <name>ozone.server.default.replication.type</name>
<value>RATIS</value>
- <tag>OZONE, CLIENT</tag>
+ <tag>OZONE</tag>
<description>
Default replication type to be used while writing key into ozone. The
value can be specified when writing the key, default is used when
- nothing is specified. Supported values: RATIS, STAND_ALONE and CHAINED.
+ nothing is specified when creating key or no default value set at bucket.
+ Supported values: RATIS, STAND_ALONE, CHAINED and EC.
</description>
</property>
<property>
diff --git
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/client/TestReplicationConfig.java
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/client/TestReplicationConfig.java
index 1c75d48..a85415e 100644
---
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/client/TestReplicationConfig.java
+++
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/client/TestReplicationConfig.java
@@ -30,13 +30,10 @@ import org.junit.Test;
public class TestReplicationConfig {
@Test
- public void testGetDefaultShouldCreateReplicationConfigFromDefaultConf() {
+ public void testGetDefaultShouldReturnNullIfNotSetClientSide() {
OzoneConfiguration conf = new OzoneConfiguration();
ReplicationConfig replicationConfig = ReplicationConfig.getDefault(conf);
- Assert.assertEquals(
- org.apache.hadoop.hdds.client.ReplicationType.RATIS.name(),
- replicationConfig.getReplicationType().name());
- Assert.assertEquals(3, replicationConfig.getRequiredNodes());
+ Assert.assertNull(replicationConfig);
}
@Test
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index 34d7fec..595fdd6 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -261,11 +261,13 @@ enum ReplicationType {
STAND_ALONE = 2;
CHAINED = 3;
EC = 4;
+ NONE = -1; // Invalid Type
}
enum ReplicationFactor {
ONE = 1;
THREE = 3;
+ ZERO = 0; // Invalid Factor
}
message ECReplicationConfig {
@@ -273,6 +275,12 @@ message ECReplicationConfig {
required int32 parity = 2;
}
+message DefaultReplicationConfig {
+ required ReplicationType type = 1;
+ optional ReplicationFactor factor = 2;
+ optional ECReplicationConfig ecReplicationConfig = 3;
+}
+
enum ScmOps {
allocateBlock = 1;
keyBlocksInfoList = 2;
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index b5fd0a7..f547116 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -263,6 +263,9 @@ public class TestRatisPipelineProvider {
largeContainerConf.set(OZONE_SCM_CONTAINER_SIZE, "100TB");
init(1, largeContainerConf);
for (ReplicationFactor factor: ReplicationFactor.values()) {
+ if (factor == ReplicationFactor.ZERO) {
+ continue;
+ }
try {
provider.create(new RatisReplicationConfig(factor));
Assert.fail("Expected SCMException for large container size with " +
@@ -276,6 +279,9 @@ public class TestRatisPipelineProvider {
largeMetadataConf.set(OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN, "100TB");
init(1, largeMetadataConf);
for (ReplicationFactor factor: ReplicationFactor.values()) {
+ if (factor == ReplicationFactor.ZERO) {
+ continue;
+ }
try {
provider.create(new RatisReplicationConfig(factor));
Assert.fail("Expected SCMException for large metadata size with " +
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/BucketArgs.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/BucketArgs.java
index 7227034..34d599e 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/BucketArgs.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/BucketArgs.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.client;
+import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.OzoneConsts;
@@ -56,6 +57,7 @@ public final class BucketArgs {
* Bucket encryption key name.
*/
private String bucketEncryptionKey;
+ private DefaultReplicationConfig defaultReplicationConfig;
private final String sourceVolume;
private final String sourceBucket;
@@ -78,13 +80,15 @@ public final class BucketArgs {
* @param sourceBucket
* @param quotaInBytes Bucket quota in bytes.
* @param quotaInNamespace Bucket quota in counts.
- * @param bucketLayout Bucket Layouts.
+ * @param bucketLayout bucket layout.
+ * @param defaultReplicationConfig default replication config.
*/
@SuppressWarnings("parameternumber")
private BucketArgs(Boolean versioning, StorageType storageType,
List<OzoneAcl> acls, Map<String, String> metadata,
String bucketEncryptionKey, String sourceVolume, String sourceBucket,
- long quotaInBytes, long quotaInNamespace, BucketLayout bucketLayout) {
+ long quotaInBytes, long quotaInNamespace, BucketLayout bucketLayout,
+ DefaultReplicationConfig defaultReplicationConfig) {
this.acls = acls;
this.versioning = versioning;
this.storageType = storageType;
@@ -95,6 +99,7 @@ public final class BucketArgs {
this.quotaInBytes = quotaInBytes;
this.quotaInNamespace = quotaInNamespace;
this.bucketLayout = bucketLayout;
+ this.defaultReplicationConfig = defaultReplicationConfig;
}
/**
@@ -139,6 +144,14 @@ public final class BucketArgs {
}
/**
+ * Returns the bucket default replication config.
+ * @return bucket's default Replication Config.
+ */
+ public DefaultReplicationConfig getDefaultReplicationConfig() {
+ return this.defaultReplicationConfig;
+ }
+
+ /**
* Returns new builder class that builds a OmBucketInfo.
*
* @return Builder
@@ -192,6 +205,7 @@ public final class BucketArgs {
private long quotaInBytes;
private long quotaInNamespace;
private BucketLayout bucketLayout;
+ private DefaultReplicationConfig defaultReplicationConfig;
public Builder() {
metadata = new HashMap<>();
@@ -249,6 +263,12 @@ public final class BucketArgs {
return this;
}
+ public BucketArgs.Builder setDefaultReplicationConfig(
+ DefaultReplicationConfig defaultReplConfig) {
+ defaultReplicationConfig = defaultReplConfig;
+ return this;
+ }
+
/**
* Constructs the BucketArgs.
@@ -257,7 +277,7 @@ public final class BucketArgs {
public BucketArgs build() {
return new BucketArgs(versioning, storageType, acls, metadata,
bucketEncryptionKey, sourceVolume, sourceBucket, quotaInBytes,
- quotaInNamespace, bucketLayout);
+ quotaInNamespace, bucketLayout, defaultReplicationConfig);
}
}
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
index 8a8cde9..575612d 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
import org.apache.hadoop.hdds.client.OzoneQuota;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -30,6 +31,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
@@ -80,7 +82,7 @@ public class OzoneBucket extends WithMetadata {
/**
* Default replication factor to be used while creating keys.
*/
- private final ReplicationConfig defaultReplication;
+ private ReplicationConfig defaultReplication;
/**
* Type of storage to be used for this bucket.
@@ -221,6 +223,39 @@ public class OzoneBucket extends WithMetadata {
this.bucketLayout = bucketLayout;
}
+ @SuppressWarnings("parameternumber")
+ public OzoneBucket(ConfigurationSource conf, ClientProtocol proxy,
+ String volumeName, String bucketName, StorageType storageType,
+ Boolean versioning, long creationTime, long modificationTime,
+ Map<String, String> metadata, String encryptionKeyName,
+ String sourceVolume, String sourceBucket, long usedBytes,
+ long usedNamespace, long quotaInBytes, long quotaInNamespace,
+ BucketLayout bucketLayout,
+ DefaultReplicationConfig defaultReplicationConfig) {
+ this(conf, proxy, volumeName, bucketName, storageType, versioning,
+ creationTime, modificationTime, metadata, encryptionKeyName,
+ sourceVolume, sourceBucket, usedBytes, usedNamespace, quotaInBytes,
+ quotaInNamespace);
+ this.bucketLayout = bucketLayout;
+ if (defaultReplicationConfig != null) {
+ this.defaultReplication =
+ defaultReplicationConfig.getType() == ReplicationType.EC ?
+ defaultReplicationConfig.getEcReplicationConfig() :
+ ReplicationConfig
+ .fromTypeAndFactor(defaultReplicationConfig.getType(),
+ defaultReplicationConfig.getFactor());
+ } else {
+ // This can happen when talk to old server. So, using old client side
+ // defaults.
+ this.defaultReplication = ReplicationConfig.fromTypeAndString(
+ ReplicationType.valueOf(
+ conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE,
+ OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT)),
+ conf.get(OzoneConfigKeys.OZONE_REPLICATION,
+ OzoneConfigKeys.OZONE_REPLICATION_DEFAULT));
+ }
+ }
+
/**
* Constructs OzoneBucket instance.
* @param conf Configuration object.
@@ -1219,4 +1254,8 @@ public class OzoneBucket extends WithMetadata {
public BucketLayout getBucketLayout() {
return bucketLayout;
}
+
+ public ReplicationConfig getReplicationConfig(){
+ return this.defaultReplication;
+ }
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index c5c9899..f0f5dee 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.crypto.CryptoInputStream;
import org.apache.hadoop.crypto.CryptoOutputStream;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
@@ -500,6 +501,12 @@ public class RpcClient implements ClientProtocol {
builder.setBucketEncryptionKey(bek);
}
+ DefaultReplicationConfig defaultReplicationConfig =
+ bucketArgs.getDefaultReplicationConfig();
+ if (defaultReplicationConfig != null) {
+ builder.setDefaultReplicationConfig(defaultReplicationConfig);
+ }
+
LOG.info("Creating Bucket: {}/{}, with Versioning {} and " +
"Storage Type set to {} and Encryption set to {} ",
volumeName, bucketName, isVersionEnabled, storageType, bek != null);
@@ -718,7 +725,8 @@ public class RpcClient implements ClientProtocol {
bucketInfo.getUsedNamespace(),
bucketInfo.getQuotaInBytes(),
bucketInfo.getQuotaInNamespace(),
- bucketInfo.getBucketLayout()
+ bucketInfo.getBucketLayout(),
+ bucketInfo.getDefaultReplicationConfig()
);
}
@@ -771,7 +779,7 @@ public class RpcClient implements ClientProtocol {
if (checkKeyNameEnabled) {
HddsClientUtils.verifyKeyName(keyName);
}
- HddsClientUtils.checkNotNull(keyName, replicationConfig);
+ HddsClientUtils.checkNotNull(keyName);
String requestId = UUID.randomUUID().toString();
OmKeyArgs.Builder builder = new OmKeyArgs.Builder()
@@ -1394,14 +1402,14 @@ public class RpcClient implements ClientProtocol {
keyOutputStream = new ECKeyOutputStream.Builder().setHandler(openKey)
.setXceiverClientManager(xceiverClientManager)
.setOmClient(ozoneManagerClient).setRequestID(requestId)
- .setReplicationConfig(replicationConfig)
+ .setReplicationConfig(openKey.getKeyInfo().getReplicationConfig())
.enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
.setConfig(clientConfig).build();
} else {
keyOutputStream = new KeyOutputStream.Builder().setHandler(openKey)
.setXceiverClientManager(xceiverClientManager)
.setOmClient(ozoneManagerClient).setRequestID(requestId)
- .setReplicationConfig(replicationConfig)
+ .setReplicationConfig(openKey.getKeyInfo().getReplicationConfig())
.enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
.setConfig(clientConfig).build();
}
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
index f38ee35..1da0c2d 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
@@ -17,6 +17,12 @@
*/
package org.apache.hadoop.ozone.client;
+import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
@@ -53,6 +59,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_DEFAULT;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_TYPE_DEFAULT;
+
/**
* OM transport for testing with in-memory state.
*/
@@ -165,33 +174,96 @@ public class MockOmTransport implements OmTransport {
private CreateKeyResponse createKey(CreateKeyRequest createKeyRequest) {
final KeyArgs keyArgs = createKeyRequest.getKeyArgs();
final long now = System.currentTimeMillis();
- final KeyInfo keyInfo = KeyInfo.newBuilder()
- .setVolumeName(keyArgs.getVolumeName())
- .setBucketName(keyArgs.getBucketName())
- .setKeyName(keyArgs.getKeyName())
- .setCreationTime(now)
- .setModificationTime(now)
- .setType(keyArgs.getType())
- .setFactor(keyArgs.getFactor())
- .setDataSize(keyArgs.getDataSize())
- .setLatestVersion(0L)
- .addKeyLocationList(KeyLocationList.newBuilder()
- .addAllKeyLocations(
+ final BucketInfo bucketInfo =
+ buckets.get(keyArgs.getVolumeName()).get(keyArgs.getBucketName());
+
+ final KeyInfo.Builder keyInfoBuilder =
+ KeyInfo.newBuilder().setVolumeName(keyArgs.getVolumeName())
+ .setBucketName(keyArgs.getBucketName())
+ .setKeyName(keyArgs.getKeyName()).setCreationTime(now)
+ .setModificationTime(now).setDataSize(keyArgs.getDataSize())
+ .setLatestVersion(0L).addKeyLocationList(
+ KeyLocationList.newBuilder().addAllKeyLocations(
blockAllocator.allocateBlock(createKeyRequest.getKeyArgs()))
- .build())
- .build();
+ .build());
+
+ if (keyArgs.getType() == HddsProtos.ReplicationType.NONE) {
+ // 1. Client did not pass replication config.
+ // Now lets try bucket defaults
+ if (bucketInfo.getDefaultReplicationConfig() != null) {
+ // Since Bucket defaults are available, let's inherit
+ final HddsProtos.ReplicationType type =
+ bucketInfo.getDefaultReplicationConfig().getType();
+ keyInfoBuilder
+ .setType(bucketInfo.getDefaultReplicationConfig().getType());
+ switch (type) {
+ case EC:
+ keyInfoBuilder.setEcReplicationConfig(
+ bucketInfo.getDefaultReplicationConfig()
+ .getEcReplicationConfig());
+ break;
+ case RATIS:
+ case STAND_ALONE:
+ keyInfoBuilder
+ .setFactor(bucketInfo.getDefaultReplicationConfig().getFactor());
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown replication type: " + type);
+ }
+ } else {
+ keyInfoBuilder.setType(HddsProtos.ReplicationType.RATIS);
+ keyInfoBuilder.setFactor(HddsProtos.ReplicationFactor.THREE);
+ }
+ } else {
+ // 1. Client passed the replication config.
+ // Let's use it.
+ final HddsProtos.ReplicationType type = keyArgs.getType();
+ keyInfoBuilder.setType(type);
+ switch (type) {
+ case EC:
+
keyInfoBuilder.setEcReplicationConfig(keyArgs.getEcReplicationConfig());
+ break;
+ case RATIS:
+ case STAND_ALONE:
+ keyInfoBuilder.setFactor(keyArgs.getFactor());
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown replication type: " + type);
+ }
+ }
+
+ final KeyInfo keyInfo = keyInfoBuilder.build();
openKeys.get(keyInfo.getVolumeName()).get(keyInfo.getBucketName())
.put(keyInfo.getKeyName(), keyInfo);
- return CreateKeyResponse.newBuilder()
- .setOpenVersion(0L)
- .setKeyInfo(keyInfo)
+ return
CreateKeyResponse.newBuilder().setOpenVersion(0L).setKeyInfo(keyInfo)
.build();
}
private InfoBucketResponse infoBucket(InfoBucketRequest infoBucketRequest) {
+ BucketInfo bucketInfo = buckets.get(infoBucketRequest.getVolumeName())
+ .get(infoBucketRequest.getBucketName());
+ if(!bucketInfo.hasDefaultReplicationConfig()) {
+ final ReplicationConfig replicationConfig = ReplicationConfig
+ .fromTypeAndString(ReplicationType
+ .valueOf(OZONE_SERVER_DEFAULT_REPLICATION_TYPE_DEFAULT),
+ OZONE_SERVER_DEFAULT_REPLICATION_DEFAULT);
+
+ bucketInfo = bucketInfo.toBuilder().setDefaultReplicationConfig(
+ new DefaultReplicationConfig(
+
ReplicationType.fromProto(replicationConfig.getReplicationType()),
+ replicationConfig
+ .getReplicationType() != HddsProtos.ReplicationType.EC ?
+ ReplicationFactor
+ .valueOf(replicationConfig.getRequiredNodes()) :
+ null, replicationConfig
+ .getReplicationType() == HddsProtos.ReplicationType.EC ?
+ (ECReplicationConfig) replicationConfig :
+ null).toProto()).build();
+ }
return InfoBucketResponse.newBuilder()
- .setBucketInfo(buckets.get(infoBucketRequest.getVolumeName())
- .get(infoBucketRequest.getBucketName()))
+ .setBucketInfo(bucketInfo)
.build();
}
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
index 56e7d34..70d4406 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
@@ -18,7 +18,9 @@
package org.apache.hadoop.ozone.client;
+import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.InMemoryConfiguration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -31,6 +33,7 @@ import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.io.erasurecode.ErasureCodecOptions;
import org.apache.hadoop.io.erasurecode.codec.RSErasureCodec;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.rpc.RpcClient;
@@ -122,7 +125,7 @@ public class TestOzoneECClient {
@Test
public void testPutECKeyAndCheckDNStoredData() throws IOException {
- OzoneBucket bucket = writeIntoECKey(inputChunks, keyName);
+ OzoneBucket bucket = writeIntoECKey(inputChunks, keyName, null);
OzoneKey key = bucket.getKey(keyName);
Assert.assertEquals(keyName, key.getName());
Map<DatanodeDetails, MockDatanodeStorage> storages =
@@ -142,7 +145,7 @@ public class TestOzoneECClient {
@Test
public void testPutECKeyAndCheckParityData() throws IOException {
- OzoneBucket bucket = writeIntoECKey(inputChunks, keyName);
+ OzoneBucket bucket = writeIntoECKey(inputChunks, keyName, null);
final ByteBuffer[] dataBuffers = new ByteBuffer[3];
for (int i = 0; i < inputChunks.length; i++) {
dataBuffers[i] = ByteBuffer.wrap(inputChunks[i]);
@@ -174,7 +177,7 @@ public class TestOzoneECClient {
@Test
public void testPutECKeyAndReadContent() throws IOException {
- OzoneBucket bucket = writeIntoECKey(inputChunks, keyName);
+ OzoneBucket bucket = writeIntoECKey(inputChunks, keyName, null);
OzoneKey key = bucket.getKey(keyName);
Assert.assertEquals(keyName, key.getName());
try (OzoneInputStream is = bucket.readKey(keyName)) {
@@ -205,17 +208,41 @@ public class TestOzoneECClient {
}
}
- private OzoneBucket writeIntoECKey(byte[][] chunks, String key)
+ @Test
+ public void testCreateBucketWithDefaultReplicationConfig()
throws IOException {
+ final OzoneBucket bucket = writeIntoECKey(inputChunks, keyName,
+ new DefaultReplicationConfig(ReplicationType.EC,
+ new ECReplicationConfig(dataBlocks, parityBlocks)));
+
+ // create key without mentioning replication config. Since we set EC
+ // replication in bucket, key should be EC key.
+ try (OzoneOutputStream out = bucket.createKey("mykey", 2000)) {
+ Assert.assertTrue(out.getOutputStream() instanceof ECKeyOutputStream);
+ for (int i = 0; i < inputChunks.length; i++) {
+ out.write(inputChunks[i]);
+ }
+ }
+ }
+
+
+ private OzoneBucket writeIntoECKey(byte[][] chunks, String key,
+ DefaultReplicationConfig defaultReplicationConfig) throws IOException {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
- volume.createBucket(bucketName);
+ if (defaultReplicationConfig != null) {
+ final BucketArgs.Builder builder = BucketArgs.newBuilder();
+ builder.setDefaultReplicationConfig(defaultReplicationConfig);
+ volume.createBucket(bucketName, builder.build());
+ } else {
+ volume.createBucket(bucketName);
+ }
OzoneBucket bucket = volume.getBucket(bucketName);
try (OzoneOutputStream out = bucket.createKey(key, 2000,
- new ECReplicationConfig(dataBlocks, parityBlocks), new HashMap<>())) {
+ new ECReplicationConfig(3, 2), new HashMap<>())) {
for (int i = 0; i < chunks.length; i++) {
out.write(chunks[i]);
}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 563f99c..751b9c9 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.om;
import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.ratis.util.TimeDuration;
/**
@@ -240,6 +242,16 @@ public final class OMConfigKeys {
public static final boolean OZONE_OM_ENABLE_FILESYSTEM_PATHS_DEFAULT =
false;
+ public static final String OZONE_SERVER_DEFAULT_REPLICATION_KEY =
+ "ozone.server.default.replication";
+ public static final String OZONE_SERVER_DEFAULT_REPLICATION_DEFAULT =
+ ReplicationFactor.THREE.toString();
+
+ public static final String OZONE_SERVER_DEFAULT_REPLICATION_TYPE_KEY =
+ "ozone.server.default.replication.type";
+ public static final String OZONE_SERVER_DEFAULT_REPLICATION_TYPE_DEFAULT =
+ ReplicationType.RATIS.toString();
+
public static final String OZONE_OM_HA_PREFIX = "ozone.om.ha";
public static final String OZONE_FS_TRASH_INTERVAL_KEY =
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
index b1b1102..a1da197 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
@@ -27,7 +27,13 @@ import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
+import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.protocol.StorageType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.audit.Auditable;
@@ -75,6 +81,11 @@ public final class OmBucketInfo extends WithObjectID
implements Auditable {
*/
private BucketEncryptionKeyInfo bekInfo;
+ /**
+ * Optional default replication for bucket.
+ */
+ private DefaultReplicationConfig defaultReplicationConfig;
+
private final String sourceVolume;
private final String sourceBucket;
@@ -107,7 +118,8 @@ public final class OmBucketInfo extends WithObjectID
implements Auditable {
* @param usedBytes - Bucket Quota Usage in bytes.
* @param quotaInBytes Bucket quota in bytes.
* @param quotaInNamespace Bucket quota in counts.
- * @param bucketLayout Bucket Layout.
+ * @param bucketLayout bucket layout.
+ * @param defaultReplicationConfig default replication config.
*/
@SuppressWarnings("checkstyle:ParameterNumber")
private OmBucketInfo(String volumeName,
@@ -127,7 +139,8 @@ public final class OmBucketInfo extends WithObjectID
implements Auditable {
long usedNamespace,
long quotaInBytes,
long quotaInNamespace,
- BucketLayout bucketLayout) {
+ BucketLayout bucketLayout,
+ DefaultReplicationConfig defaultReplicationConfig) {
this.volumeName = volumeName;
this.bucketName = bucketName;
this.acls = acls;
@@ -146,6 +159,7 @@ public final class OmBucketInfo extends WithObjectID
implements Auditable {
this.quotaInBytes = quotaInBytes;
this.quotaInNamespace = quotaInNamespace;
this.bucketLayout = bucketLayout;
+ this.defaultReplicationConfig = defaultReplicationConfig;
}
/**
@@ -245,12 +259,22 @@ public final class OmBucketInfo extends WithObjectID
implements Auditable {
/**
* Returns the Bucket Layout.
+ *
* @return BucketLayout.
*/
public BucketLayout getBucketLayout() {
return bucketLayout;
}
+ /**
+ * Returns bucket EC replication config.
+ *
+ * @return EC replication config.
+ */
+ public DefaultReplicationConfig getDefaultReplicationConfig() {
+ return defaultReplicationConfig;
+ }
+
public String getSourceVolume() {
return sourceVolume;
}
@@ -340,6 +364,10 @@ public final class OmBucketInfo extends WithObjectID
implements Auditable {
acl.getName(), (BitSet) acl.getAclBitSet().clone(),
acl.getAclScope())));
+ if (defaultReplicationConfig != null) {
+ builder.setDefaultReplicationConfig(defaultReplicationConfig.copy());
+ }
+
return builder.build();
}
@@ -362,7 +390,21 @@ public final class OmBucketInfo extends WithObjectID
implements Auditable {
.setUsedNamespace(usedNamespace)
.setQuotaInBytes(quotaInBytes)
.setQuotaInNamespace(quotaInNamespace)
- .setBucketLayout(bucketLayout);
+ .setBucketLayout(bucketLayout)
+ .setDefaultReplicationConfig(defaultReplicationConfig);
+ }
+
+ public void setDefaultReplicationConfig(ReplicationConfig replicationConfig)
{
+ this.defaultReplicationConfig = new DefaultReplicationConfig(
+ ReplicationType.fromProto(replicationConfig.getReplicationType()),
+ replicationConfig
+ .getReplicationType() == HddsProtos.ReplicationType.EC ?
+ null :
+ ReplicationFactor.valueOf(replicationConfig.getRequiredNodes()),
+ replicationConfig
+ .getReplicationType() == HddsProtos.ReplicationType.EC ?
+ ((ECReplicationConfig) replicationConfig) :
+ null);
}
/**
@@ -387,6 +429,7 @@ public final class OmBucketInfo extends WithObjectID
implements Auditable {
private long quotaInBytes;
private long quotaInNamespace;
private BucketLayout bucketLayout;
+ private DefaultReplicationConfig defaultReplicationConfig;
public Builder() {
//Default values
@@ -510,6 +553,12 @@ public final class OmBucketInfo extends WithObjectID
implements Auditable {
return this;
}
+ public Builder setDefaultReplicationConfig(
+ DefaultReplicationConfig defaultReplConfig) {
+ this.defaultReplicationConfig = defaultReplConfig;
+ return this;
+ }
+
/**
* Constructs the OmBucketInfo.
* @return instance of OmBucketInfo.
@@ -520,11 +569,11 @@ public final class OmBucketInfo extends WithObjectID
implements Auditable {
Preconditions.checkNotNull(acls);
Preconditions.checkNotNull(isVersionEnabled);
Preconditions.checkNotNull(storageType);
-
return new OmBucketInfo(volumeName, bucketName, acls, isVersionEnabled,
storageType, creationTime, modificationTime, objectID, updateID,
metadata, bekInfo, sourceVolume, sourceBucket, usedBytes,
- usedNamespace, quotaInBytes, quotaInNamespace, bucketLayout);
+ usedNamespace, quotaInBytes, quotaInNamespace, bucketLayout,
+ defaultReplicationConfig);
}
}
@@ -553,6 +602,9 @@ public final class OmBucketInfo extends WithObjectID
implements Auditable {
if (bekInfo != null && bekInfo.getKeyName() != null) {
bib.setBeinfo(OMPBHelper.convert(bekInfo));
}
+ if (defaultReplicationConfig != null) {
+ bib.setDefaultReplicationConfig(defaultReplicationConfig.toProto());
+ }
if (sourceVolume != null) {
bib.setSourceVolume(sourceVolume);
}
@@ -598,6 +650,10 @@ public final class OmBucketInfo extends WithObjectID
implements Auditable {
obib.setBucketLayout(
BucketLayout.fromProto(bucketInfo.getBucketLayout()));
}
+ if (bucketInfo.hasDefaultReplicationConfig()) {
+ obib.setDefaultReplicationConfig(
+ OMPBHelper.convert(bucketInfo.getDefaultReplicationConfig()));
+ }
if (bucketInfo.hasObjectID()) {
obib.setObjectID(bucketInfo.getObjectID());
}
@@ -637,6 +693,7 @@ public final class OmBucketInfo extends WithObjectID
implements Auditable {
", quotaInBytes='" + quotaInBytes + "'" +
", quotaInNamespace='" + quotaInNamespace + "'" +
", bucketLayout='" + bucketLayout + '\'' +
+ ", defaultReplicationConfig='" + defaultReplicationConfig + '\'' +
sourceInfo +
'}';
}
@@ -664,7 +721,8 @@ public final class OmBucketInfo extends WithObjectID
implements Auditable {
Objects.equals(sourceVolume, that.sourceVolume) &&
Objects.equals(sourceBucket, that.sourceBucket) &&
Objects.equals(metadata, that.metadata) &&
- Objects.equals(bekInfo, that.bekInfo);
+ Objects.equals(bekInfo, that.bekInfo) && Objects
+ .equals(defaultReplicationConfig, this.defaultReplicationConfig);
}
@Override
@@ -692,6 +750,7 @@ public final class OmBucketInfo extends WithObjectID
implements Auditable {
", quotaInBytes=" + quotaInBytes +
", quotaInNamespace=" + quotaInNamespace +
", bucketLayout=" + bucketLayout +
+ ", defaultReplicationConfig=" + defaultReplicationConfig +
'}';
}
}
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OMPBHelper.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OMPBHelper.java
index 2ff2dc8..c943341 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OMPBHelper.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OMPBHelper.java
@@ -21,6 +21,11 @@ import com.google.protobuf.ByteString;
import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
@@ -141,6 +146,55 @@ public final class OMPBHelper {
ezKeyVersionName);
}
+ public static DefaultReplicationConfig convert(
+ HddsProtos.DefaultReplicationConfig defaultReplicationConfig) {
+ if (defaultReplicationConfig == null) {
+ throw new IllegalArgumentException(
+ "Invalid argument: default replication config" + " is null");
+ }
+
+ final ReplicationType type =
+ ReplicationType.fromProto(defaultReplicationConfig.getType());
+ DefaultReplicationConfig defaultReplicationConfigObj = null;
+ switch (type) {
+ case EC:
+ defaultReplicationConfigObj = new DefaultReplicationConfig(type,
+ new ECReplicationConfig(
+ defaultReplicationConfig.getEcReplicationConfig()));
+ break;
+ default:
+ final ReplicationFactor factor =
+ ReplicationFactor.fromProto(defaultReplicationConfig.getFactor());
+ defaultReplicationConfigObj = new DefaultReplicationConfig(type, factor);
+ }
+ return defaultReplicationConfigObj;
+ }
+
+ public static HddsProtos.DefaultReplicationConfig convert(
+ DefaultReplicationConfig defaultReplicationConfig) {
+ if (defaultReplicationConfig == null) {
+ throw new IllegalArgumentException(
+ "Invalid argument: default replication config" + " is null");
+ }
+
+ final HddsProtos.DefaultReplicationConfig.Builder builder =
+ HddsProtos.DefaultReplicationConfig.newBuilder();
+ builder.setType(
+ ReplicationType.toProto(defaultReplicationConfig.getType()));
+
+ if (defaultReplicationConfig.getFactor() != null) {
+ builder.setFactor(ReplicationFactor
+ .toProto(defaultReplicationConfig.getFactor()));
+ }
+
+ if (defaultReplicationConfig.getEcReplicationConfig() != null) {
+ builder.setEcReplicationConfig(
+ defaultReplicationConfig.getEcReplicationConfig().toProto());
+ }
+
+ return builder.build();
+ }
+
public static CipherSuite convert(CipherSuiteProto proto) {
switch(proto) {
case AES_CTR_NOPADDING:
diff --git
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmBucketInfo.java
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmBucketInfo.java
index 650fc91..a742ab9 100644
---
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmBucketInfo.java
+++
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmBucketInfo.java
@@ -17,9 +17,13 @@
*/
package org.apache.hadoop.ozone.om.helpers;
+import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
import org.junit.Assert;
import org.junit.Test;
@@ -111,4 +115,52 @@ public class TestOmBucketInfo {
Assert.assertEquals((int) 1, cloneBucketInfo.getAcls().size());
}
+
+ @Test
+ public void getProtobufMessageEC() {
+ OmBucketInfo omBucketInfo =
+ OmBucketInfo.newBuilder().setBucketName("bucket").setVolumeName("vol1")
+ .setCreationTime(Time.now()).setIsVersionEnabled(false)
+ .setStorageType(StorageType.ARCHIVE).setAcls(Collections
+ .singletonList(new OzoneAcl(IAccessAuthorizer.ACLIdentityType.USER,
+ "defaultUser", IAccessAuthorizer.ACLType.WRITE_ACL,
+ OzoneAcl.AclScope.ACCESS))).build();
+ OzoneManagerProtocolProtos.BucketInfo protobuf =
omBucketInfo.getProtobuf();
+ // No EC Config
+ Assert.assertFalse(protobuf.hasDefaultReplicationConfig());
+
+ // Reconstruct object from Proto
+ OmBucketInfo recovered = OmBucketInfo.getFromProtobuf(protobuf);
+ Assert.assertNull(recovered.getDefaultReplicationConfig());
+
+ // EC Config
+ omBucketInfo =
+ OmBucketInfo.newBuilder().setBucketName("bucket").setVolumeName("vol1")
+ .setCreationTime(Time.now()).setIsVersionEnabled(false)
+ .setStorageType(StorageType.ARCHIVE).setAcls(Collections
+ .singletonList(new OzoneAcl(IAccessAuthorizer.ACLIdentityType.USER,
+ "defaultUser", IAccessAuthorizer.ACLType.WRITE_ACL,
+ OzoneAcl.AclScope.ACCESS))).setDefaultReplicationConfig(
+ new DefaultReplicationConfig(ReplicationType.EC,
+ new ECReplicationConfig(3, 2))).build();
+ protobuf = omBucketInfo.getProtobuf();
+
+ Assert.assertTrue(protobuf.hasDefaultReplicationConfig());
+ Assert.assertEquals(3,
+ protobuf.getDefaultReplicationConfig().getEcReplicationConfig()
+ .getData());
+ Assert.assertEquals(2,
+ protobuf.getDefaultReplicationConfig().getEcReplicationConfig()
+ .getParity());
+
+ // Reconstruct object from Proto
+ recovered = OmBucketInfo.getFromProtobuf(protobuf);
+ Assert.assertEquals(ReplicationType.EC,
+ recovered.getDefaultReplicationConfig().getType());
+ ECReplicationConfig config =
+ recovered.getDefaultReplicationConfig().getEcReplicationConfig();
+ Assert.assertNotNull(config);
+ Assert.assertEquals(3, config.getData());
+ Assert.assertEquals(2, config.getParity());
+ }
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
index c78b746..1ee7bce 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
@@ -58,11 +58,17 @@ public class TestOzoneConfigurationFields extends
TestConfigurationFieldsBase {
xmlPropsToSkipCompare.add("hadoop.tags.custom");
xmlPropsToSkipCompare.add("ozone.om.nodes.EXAMPLEOMSERVICEID");
xmlPropsToSkipCompare.add("ozone.scm.nodes.EXAMPLESCMSERVICEID");
+ xmlPropsToSkipCompare.add("ozone.scm.nodes.EXAMPLESCMSERVICEID");
xmlPrefixToSkipCompare.add("ipc.client.rpc-timeout.ms");
xmlPropsToSkipCompare.add("ozone.om.leader.election.minimum.timeout" +
".duration"); // Deprecated config
configurationPropsToSkipCompare
.add(ScmConfig.ConfigStrings.HDDS_SCM_INIT_DEFAULT_LAYOUT_VERSION);
+ // Currently replication and type configs moved to server side.
+ configurationPropsToSkipCompare
+ .add(OzoneConfigKeys.OZONE_REPLICATION);
+ configurationPropsToSkipCompare
+ .add(OzoneConfigKeys.OZONE_REPLICATION_TYPE);
addPropertiesNotInXml();
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
index 06f0646..28f7c2c 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
@@ -18,7 +18,10 @@ package org.apache.hadoop.ozone.client.rpc;
import com.google.common.cache.Cache;
import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
@@ -27,11 +30,15 @@ import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.BucketArgs;
import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry;
import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.TestHelper;
import org.junit.AfterClass;
@@ -40,12 +47,14 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
@@ -64,6 +73,9 @@ public class TestECKeyOutputStream {
private static String volumeName;
private static String bucketName;
private static String keyString;
+ private static int dataBlocks = 3;
+ private static int parityBlocks = 2;
+ private static byte[][] inputChunks = new byte[dataBlocks][chunkSize];
/**
* Create a MiniDFSCluster for testing.
@@ -99,6 +111,7 @@ public class TestECKeyOutputStream {
bucketName = volumeName;
objectStore.createVolume(volumeName);
objectStore.getVolume(volumeName).createBucket(bucketName);
+ initInputChunks();
}
/**
@@ -121,6 +134,61 @@ public class TestECKeyOutputStream {
}
@Test
+ public void testCreateKeyWithOutBucketDefaults() throws Exception {
+ OzoneVolume volume = objectStore.getVolume(volumeName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+ try (OzoneOutputStream out = bucket.createKey("myKey", 2000)) {
+ Assert.assertTrue(out.getOutputStream() instanceof KeyOutputStream);
+ for (int i = 0; i < inputChunks.length; i++) {
+ out.write(inputChunks[i]);
+ }
+ }
+ }
+
+ @Test
+ public void testCreateKeyWithBucketDefaults() throws Exception {
+ String myBucket = UUID.randomUUID().toString();
+ OzoneVolume volume = objectStore.getVolume(volumeName);
+ final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
+ bucketArgs.setDefaultReplicationConfig(
+ new DefaultReplicationConfig(ReplicationType.EC,
+ new ECReplicationConfig(3, 2)));
+
+ volume.createBucket(myBucket, bucketArgs.build());
+ OzoneBucket bucket = volume.getBucket(myBucket);
+
+ try (OzoneOutputStream out = bucket.createKey(keyString, 2000)) {
+ Assert.assertTrue(out.getOutputStream() instanceof ECKeyOutputStream);
+ for (int i = 0; i < inputChunks.length; i++) {
+ out.write(inputChunks[i]);
+ }
+ }
+ }
+
+ @Test
+ public void testCreateRatisKeyAndWithECBucketDefaults() throws Exception {
+ String myBucket = UUID.randomUUID().toString();
+ OzoneVolume volume = objectStore.getVolume(volumeName);
+ final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
+ bucketArgs.setDefaultReplicationConfig(
+ new DefaultReplicationConfig(ReplicationType.EC,
+ new ECReplicationConfig(3, 2)));
+
+ volume.createBucket(myBucket, bucketArgs.build());
+ OzoneBucket bucket = volume.getBucket(myBucket);
+ try (OzoneOutputStream out = bucket
+ .createKey("testCreateRatisKeyAndWithECBucketDefaults", 2000,
+ new RatisReplicationConfig("3"), new HashMap<>())) {
+ Assert.assertTrue(out.getOutputStream() instanceof KeyOutputStream);
+ for (int i = 0; i < inputChunks.length; i++) {
+ out.write(inputChunks[i]);
+ }
+ }
+ }
+
+
+
+ @Test
public void testECKeyXceiverClientShouldNotUseCachedKeysForDifferentStreams()
throws Exception {
int data = 3;
@@ -131,14 +199,16 @@ public class TestECKeyOutputStream {
final List<BlockOutputStreamEntry> streamEntries =
((ECKeyOutputStream) key.getOutputStream()).getStreamEntries();
Assert.assertEquals(data + parity, streamEntries.size());
+ final Cache<String, XceiverClientSpi> clientCache =
+ ((XceiverClientManager) ((ECKeyOutputStream) key.getOutputStream())
+ .getXceiverClientFactory()).getClientCache();
+ clientCache.invalidateAll();
+ clientCache.cleanUp();
final Pipeline firstStreamPipeline = streamEntries.get(0).getPipeline();
XceiverClientSpi xceiverClientSpi =
((ECKeyOutputStream) key.getOutputStream()).getXceiverClientFactory()
.acquireClient(firstStreamPipeline);
Assert.assertNotNull(xceiverClientSpi);
- final Cache<String, XceiverClientSpi> clientCache =
- ((XceiverClientManager) ((ECKeyOutputStream) key.getOutputStream())
- .getXceiverClientFactory()).getClientCache();
final String firstCacheKey =
clientCache.asMap().entrySet().iterator().next().getKey();
List<String> prevVisitedKeys = new ArrayList<>();
@@ -172,4 +242,18 @@ public class TestECKeyOutputStream {
}
return null;
}
+
+ private static void initInputChunks() {
+ for (int i = 0; i < dataBlocks; i++) {
+ inputChunks[i] = getBytesWith(i + 1, chunkSize);
+ }
+ }
+
+ private static byte[] getBytesWith(int singleDigitNumber, int total) {
+ StringBuilder builder = new StringBuilder(singleDigitNumber);
+ for (int i = 1; i <= total; i++) {
+ builder.append(singleDigitNumber);
+ }
+ return builder.toString().getBytes(UTF_8);
+ }
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java
index ac43c24..e8c0953 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
import org.apache.hadoop.ozone.security.acl.IOzoneObj;
+import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
import org.apache.hadoop.ozone.security.acl.RequestContext;
import org.apache.ozone.test.GenericTestUtils;
@@ -144,7 +145,6 @@ public class TestOmAcls {
logCapturer.clearOutput();
TestOmAcls.aclAllow = false;
-
OzoneTestUtils.expectOmException(ResultCodes.PERMISSION_DENIED,
() -> TestDataUtil.createKey(bucket, "testKey", "testcontent"));
assertTrue(logCapturer.getOutput().contains("doesn't have CREATE " +
@@ -158,6 +158,13 @@ public class TestOmAcls {
@Override
public boolean checkAccess(IOzoneObj ozoneObject, RequestContext context) {
+ // Allow bucket read access. While creating key, we access bucket to
+ // inherit the replication config.
+ if (((OzoneObjInfo) ozoneObject).getResourceType().toString()
+ .equals("bucket") && "READ"
+ .equals(context.getAclRights().toString())) {
+ return true;
+ }
return TestOmAcls.aclAllow;
}
}
diff --git
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index db19c37..da1a6b0 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -543,6 +543,7 @@ message BucketInfo {
optional int64 quotaInNamespace = 16 [default = -2];
optional uint64 usedNamespace = 17;
optional BucketLayoutProto bucketLayout = 18;
+ optional hadoop.hdds.DefaultReplicationConfig defaultReplicationConfig =
19;
}
enum StorageTypeProto {
@@ -620,6 +621,7 @@ message BucketArgs {
repeated hadoop.hdds.KeyValue metadata = 7;
optional uint64 quotaInBytes = 8;
optional uint64 quotaInNamespace = 9;
+ optional hadoop.hdds.DefaultReplicationConfig defaultReplicationConfig =
10;
}
message PrefixInfo {
@@ -756,8 +758,8 @@ message KeyArgs {
required string bucketName = 2;
required string keyName = 3;
optional uint64 dataSize = 4;
- optional hadoop.hdds.ReplicationType type = 5;
- optional hadoop.hdds.ReplicationFactor factor = 6;
+ optional hadoop.hdds.ReplicationType type = 5 [default = NONE];
+ optional hadoop.hdds.ReplicationFactor factor = 6 [default = ZERO];
repeated KeyLocation keyLocations = 7;
optional bool isMultipartKey = 8;
optional string multipartUploadID = 9;
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneConfigUtil.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneConfigUtil.java
new file mode 100644
index 0000000..f8a2add
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneConfigUtil.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+
+/**
+ * Utility class for ozone configurations.
+ */
+public final class OzoneConfigUtil {
+ private OzoneConfigUtil() {
+ }
+
+ public static ReplicationConfig resolveReplicationConfigPreference(
+ HddsProtos.ReplicationType clientType,
+ HddsProtos.ReplicationFactor clientFactor,
+ HddsProtos.ECReplicationConfig clientECReplicationConfig,
+ DefaultReplicationConfig bucketDefaultReplicationConfig,
+ ReplicationConfig omDefaultReplicationConfig) {
+ ReplicationConfig replicationConfig = null;
+ if (clientType != HddsProtos.ReplicationType.NONE) {
+ // Client passed the replication config, so let's use it.
+ replicationConfig = ReplicationConfig
+ .fromProto(clientType, clientFactor, clientECReplicationConfig);
+ } else {
+ // type is NONE, so, let's look for the bucket defaults.
+ if (bucketDefaultReplicationConfig != null) {
+ // Since Bucket defaults are available, let's inherit
+ replicationConfig = ReplicationConfig.fromProto(
+ ReplicationType.toProto(bucketDefaultReplicationConfig.getType()),
+ ReplicationFactor
+ .toProto(bucketDefaultReplicationConfig.getFactor()),
+ bucketDefaultReplicationConfig.getEcReplicationConfig().toProto());
+ } else {
+ // if bucket defaults also not available, then use server defaults.
+ replicationConfig = omDefaultReplicationConfig;
+ }
+ }
+ return replicationConfig;
+ }
+}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index b144371..4da72c3 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -60,6 +60,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertResponseProto;
@@ -227,6 +229,10 @@ import static
org.apache.hadoop.ozone.OzoneConsts.PREPARE_MARKER_KEY;
import static org.apache.hadoop.ozone.OzoneConsts.OM_RATIS_SNAPSHOT_DIR;
import static org.apache.hadoop.ozone.OzoneConsts.RPC_PORT;
import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_DEFAULT;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_KEY;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_TYPE_DEFAULT;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_TYPE_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS_DEFAULT;
@@ -2441,7 +2447,13 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
auditMap.put(OzoneConsts.BUCKET, bucket);
try {
metrics.incNumBucketInfos();
- return bucketManager.getBucketInfo(volume, bucket);
+ final OmBucketInfo bucketInfo =
+ bucketManager.getBucketInfo(volume, bucket);
+ if (bucketInfo != null && bucketInfo
+ .getDefaultReplicationConfig() == null) {
+ bucketInfo.setDefaultReplicationConfig(getDefaultReplicationConfig());
+ }
+ return bucketInfo;
} catch (Exception ex) {
metrics.incNumBucketInfoFails();
auditSuccess = false;
@@ -4060,6 +4072,15 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
.getTrimmed(OZONE_OM_METADATA_LAYOUT,
OZONE_OM_METADATA_LAYOUT_DEFAULT);
}
+ public ReplicationConfig getDefaultReplicationConfig() {
+ String replication =
configuration.get(OZONE_SERVER_DEFAULT_REPLICATION_KEY,
+ OZONE_SERVER_DEFAULT_REPLICATION_DEFAULT);
+ String type = configuration.get(OZONE_SERVER_DEFAULT_REPLICATION_TYPE_KEY,
+ OZONE_SERVER_DEFAULT_REPLICATION_TYPE_DEFAULT);
+ return ReplicationConfig
+ .fromTypeAndString(ReplicationType.valueOf(type), replication);
+ }
+
/**
* Create volume which is required for S3Gateway operations.
* @throws IOException
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
index a8e8547..2047ef6 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
@@ -28,7 +28,8 @@ import java.util.Map;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
@@ -356,8 +357,8 @@ public class OMDirectoryCreateRequest extends OMKeyRequest {
.setCreationTime(keyArgs.getModificationTime())
.setModificationTime(keyArgs.getModificationTime())
.setDataSize(0)
- .setReplicationConfig(ReplicationConfig
- .fromTypeAndFactor(keyArgs.getType(), keyArgs.getFactor()))
+ .setReplicationConfig(new StandaloneReplicationConfig(
+ HddsProtos.ReplicationFactor.ONE))
.setObjectID(objectId)
.setUpdateID(objectId);
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
index 73f2f07..c51324b 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OzoneConfigUtil;
import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
import org.apache.hadoop.ozone.om.response.file.OMFileCreateResponse;
import org.slf4j.Logger;
@@ -125,8 +126,14 @@ public class OMFileCreateRequest extends OMKeyRequest {
type = useRatis ? HddsProtos.ReplicationType.RATIS :
HddsProtos.ReplicationType.STAND_ALONE;
}
- ReplicationConfig repConfig = ReplicationConfig.fromProto(
- type, factor, keyArgs.getEcReplicationConfig());
+
+ final OmBucketInfo bucketInfo = ozoneManager
+ .getBucketInfo(keyArgs.getVolumeName(), keyArgs.getBucketName());
+ final ReplicationConfig repConfig = OzoneConfigUtil
+ .resolveReplicationConfigPreference(type, factor,
+ keyArgs.getEcReplicationConfig(),
+ bucketInfo.getDefaultReplicationConfig(),
+ ozoneManager.getDefaultReplicationConfig());
// TODO: Here we are allocating block with out any check for
// bucket/key/volume or not and also with out any authorization checks.
@@ -244,14 +251,19 @@ public class OMFileCreateRequest extends OMKeyRequest {
}
// do open key
- OmBucketInfo bucketInfo = omMetadataManager.getBucketTable().get(
- omMetadataManager.getBucketKey(volumeName, bucketName));
+ omBucketInfo =
+ getBucketInfo(omMetadataManager, volumeName, bucketName);
+ final ReplicationConfig repConfig = OzoneConfigUtil
+ .resolveReplicationConfigPreference(keyArgs.getType(),
+ keyArgs.getFactor(), keyArgs.getEcReplicationConfig(),
+ omBucketInfo.getDefaultReplicationConfig(),
+ ozoneManager.getDefaultReplicationConfig());
omKeyInfo = prepareKeyInfo(omMetadataManager, keyArgs, dbKeyInfo,
keyArgs.getDataSize(), locations, getFileEncryptionInfo(keyArgs),
- ozoneManager.getPrefixManager(), bucketInfo, trxnLogIndex,
+ ozoneManager.getPrefixManager(), omBucketInfo, trxnLogIndex,
ozoneManager.getObjectIdFromTxId(trxnLogIndex),
- ozoneManager.isRatisEnabled());
+ ozoneManager.isRatisEnabled(), repConfig);
long openVersion = omKeyInfo.getLatestVersionLocations().getVersion();
long clientID = createFileRequest.getClientID();
@@ -267,8 +279,6 @@ public class OMFileCreateRequest extends OMKeyRequest {
.stream().map(OmKeyLocationInfo::getFromProtobuf)
.collect(Collectors.toList());
omKeyInfo.appendNewBlocks(newLocationList, false);
-
- omBucketInfo = getBucketInfo(omMetadataManager, volumeName, bucketName);
// check bucket and volume quota
long preAllocatedSpace = newLocationList.size()
* ozoneManager.getScmBlockSize()
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestWithFSO.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestWithFSO.java
index d792222..1fdb723 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestWithFSO.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestWithFSO.java
@@ -19,9 +19,11 @@
package org.apache.hadoop.ozone.om.request.file;
import com.google.common.base.Optional;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.ozone.audit.OMAction;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OzoneConfigUtil;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -162,13 +164,18 @@ public class OMFileCreateRequestWithFSO extends
OMFileCreateRequest {
// do open key
OmBucketInfo bucketInfo = omMetadataManager.getBucketTable().get(
omMetadataManager.getBucketKey(volumeName, bucketName));
+ final ReplicationConfig repConfig = OzoneConfigUtil
+ .resolveReplicationConfigPreference(keyArgs.getType(),
+ keyArgs.getFactor(), keyArgs.getEcReplicationConfig(),
+ bucketInfo.getDefaultReplicationConfig(),
+ ozoneManager.getDefaultReplicationConfig());
OmKeyInfo omFileInfo = prepareFileInfo(omMetadataManager, keyArgs,
dbFileInfo, keyArgs.getDataSize(), locations,
getFileEncryptionInfo(keyArgs), ozoneManager.getPrefixManager(),
bucketInfo, pathInfoFSO, trxnLogIndex,
pathInfoFSO.getLeafNodeObjectId(),
- ozoneManager.isRatisEnabled());
+ ozoneManager.isRatisEnabled(), repConfig);
long openVersion = omFileInfo.getLatestVersionLocations().getVersion();
long clientID = createFileRequest.getClientID();
@@ -184,9 +191,9 @@ public class OMFileCreateRequestWithFSO extends
OMFileCreateRequest {
omBucketInfo = getBucketInfo(omMetadataManager, volumeName, bucketName);
// check bucket and volume quota
- long preAllocatedSpace = newLocationList.size()
- * ozoneManager.getScmBlockSize()
- * omFileInfo.getReplicationConfig().getRequiredNodes();
+ long preAllocatedSpace =
+ newLocationList.size() * ozoneManager.getScmBlockSize() * repConfig
+ .getRequiredNodes();
checkBucketQuotaInBytes(omBucketInfo, preAllocatedSpace);
checkBucketQuotaInNamespace(omBucketInfo, 1L);
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
index 9ac43c9..f6a954f 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
@@ -222,8 +222,9 @@ public class OMAllocateBlockRequest extends OMKeyRequest {
omBucketInfo.incrUsedBytes(preAllocatedSpace);
omResponse.setAllocateBlockResponse(AllocateBlockResponse.newBuilder()
.setKeyLocation(blockLocation).build());
+ OmBucketInfo shortBucketInfo = omBucketInfo.copyObject();
omClientResponse = new OMAllocateBlockResponse(omResponse.build(),
- openKeyInfo, clientID, omBucketInfo.copyObject());
+ openKeyInfo, clientID, shortBucketInfo);
LOG.debug("Allocated block for Volume:{}, Bucket:{}, OpenKey:{}",
volumeName, bucketName, openKeyName);
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
index 5fdfe6c..564c3d7 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OzoneConfigUtil;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.request.file.OMDirectoryCreateRequest;
import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
@@ -142,8 +143,13 @@ public class OMKeyCreateRequest extends OMKeyRequest {
type = useRatis ? HddsProtos.ReplicationType.RATIS :
HddsProtos.ReplicationType.STAND_ALONE;
}
- ReplicationConfig repConfig = ReplicationConfig.fromProto(
- type, factor, keyArgs.getEcReplicationConfig());
+ final OmBucketInfo bucketInfo = ozoneManager
+ .getBucketInfo(keyArgs.getVolumeName(), keyArgs.getBucketName());
+ final ReplicationConfig repConfig = OzoneConfigUtil
+ .resolveReplicationConfigPreference(type, factor,
+ keyArgs.getEcReplicationConfig(),
+ bucketInfo.getDefaultReplicationConfig(),
+ ozoneManager.getDefaultReplicationConfig());
// TODO: Here we are allocating block with out any check for
// bucket/key/volume or not and also with out any authorization checks.
@@ -278,11 +284,17 @@ public class OMKeyCreateRequest extends OMKeyRequest {
numMissingParents = missingParentInfos.size();
}
+ ReplicationConfig replicationConfig = OzoneConfigUtil
+ .resolveReplicationConfigPreference(keyArgs.getType(),
+ keyArgs.getFactor(), keyArgs.getEcReplicationConfig(),
+ bucketInfo.getDefaultReplicationConfig(),
+ ozoneManager.getDefaultReplicationConfig());
+
omKeyInfo = prepareKeyInfo(omMetadataManager, keyArgs, dbKeyInfo,
keyArgs.getDataSize(), locations, getFileEncryptionInfo(keyArgs),
ozoneManager.getPrefixManager(), bucketInfo, trxnLogIndex,
ozoneManager.getObjectIdFromTxId(trxnLogIndex),
- ozoneManager.isRatisEnabled());
+ ozoneManager.isRatisEnabled(), replicationConfig);
long openVersion = omKeyInfo.getLatestVersionLocations().getVersion();
long clientID = createKeyRequest.getClientID();
@@ -305,7 +317,7 @@ public class OMKeyCreateRequest extends OMKeyRequest {
// commitKey.
long preAllocatedSpace = newLocationList.size()
* ozoneManager.getScmBlockSize()
- * omKeyInfo.getReplicationConfig().getRequiredNodes();
+ * replicationConfig.getRequiredNodes();
// check bucket and volume quota
checkBucketQuotaInBytes(omBucketInfo, preAllocatedSpace);
checkBucketQuotaInNamespace(omBucketInfo, 1L);
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java
index 654c251..6125f9d 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java
@@ -19,9 +19,11 @@
package org.apache.hadoop.ozone.om.request.key;
import com.google.common.base.Optional;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.ozone.audit.OMAction;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OzoneConfigUtil;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -145,13 +147,18 @@ public class OMKeyCreateRequestWithFSO extends
OMKeyCreateRequest {
// do open key
OmBucketInfo bucketInfo = omMetadataManager.getBucketTable().get(
omMetadataManager.getBucketKey(volumeName, bucketName));
+ final ReplicationConfig repConfig = OzoneConfigUtil
+ .resolveReplicationConfigPreference(keyArgs.getType(),
+ keyArgs.getFactor(), keyArgs.getEcReplicationConfig(),
+ bucketInfo.getDefaultReplicationConfig(),
+ ozoneManager.getDefaultReplicationConfig());
OmKeyInfo omFileInfo = prepareFileInfo(omMetadataManager, keyArgs,
dbFileInfo, keyArgs.getDataSize(), locations,
getFileEncryptionInfo(keyArgs), ozoneManager.getPrefixManager(),
bucketInfo, pathInfoFSO, trxnLogIndex,
pathInfoFSO.getLeafNodeObjectId(),
- ozoneManager.isRatisEnabled());
+ ozoneManager.isRatisEnabled(), repConfig);
long openVersion = omFileInfo.getLatestVersionLocations().getVersion();
long clientID = createKeyRequest.getClientID();
@@ -167,9 +174,9 @@ public class OMKeyCreateRequestWithFSO extends
OMKeyCreateRequest {
omBucketInfo = getBucketInfo(omMetadataManager, volumeName, bucketName);
// check bucket and volume quota
- long preAllocatedSpace = newLocationList.size()
- * ozoneManager.getScmBlockSize()
- * omFileInfo.getReplicationConfig().getRequiredNodes();
+ long preAllocatedSpace =
+ newLocationList.size() * ozoneManager.getScmBlockSize() * repConfig
+ .getRequiredNodes();
checkBucketQuotaInBytes(omBucketInfo, preAllocatedSpace);
checkBucketQuotaInNamespace(omBucketInfo, 1L);
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index bfd7bb7..3410641 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -585,12 +585,13 @@ public abstract class OMKeyRequest extends
OMClientRequest {
@Nullable FileEncryptionInfo encInfo,
@Nonnull PrefixManager prefixManager,
@Nullable OmBucketInfo omBucketInfo,
- long transactionLogIndex, long objectID, boolean isRatisEnabled)
+ long transactionLogIndex, long objectID, boolean isRatisEnabled,
+ ReplicationConfig replicationConfig)
throws IOException {
return prepareFileInfo(omMetadataManager, keyArgs, dbKeyInfo, size,
locations, encInfo, prefixManager, omBucketInfo, null,
- transactionLogIndex, objectID, isRatisEnabled);
+ transactionLogIndex, objectID, isRatisEnabled, replicationConfig);
}
/**
@@ -608,7 +609,7 @@ public abstract class OMKeyRequest extends OMClientRequest {
@Nullable OmBucketInfo omBucketInfo,
OMFileRequest.OMPathInfoWithFSO omPathInfo,
long transactionLogIndex, long objectID,
- boolean isRatisEnabled)
+ boolean isRatisEnabled, ReplicationConfig replicationConfig)
throws IOException {
if (keyArgs.getIsMultipartKey()) {
return prepareMultipartFileInfo(omMetadataManager, keyArgs,
@@ -633,11 +634,9 @@ public abstract class OMKeyRequest extends OMClientRequest
{
// the key does not exist, create a new object.
// Blocks will be appended as version 0.
- return createFileInfo(keyArgs, locations, ReplicationConfig
- .fromProto(keyArgs.getType(), keyArgs.getFactor(),
- keyArgs.getEcReplicationConfig()), keyArgs.getDataSize(),
- encInfo, prefixManager, omBucketInfo, omPathInfo, transactionLogIndex,
- objectID);
+ return createFileInfo(keyArgs, locations, replicationConfig,
+ keyArgs.getDataSize(), encInfo, prefixManager,
+ omBucketInfo, omPathInfo, transactionLogIndex, objectID);
}
/**
@@ -654,7 +653,6 @@ public abstract class OMKeyRequest extends OMClientRequest {
@Nullable OmBucketInfo omBucketInfo,
OMFileRequest.OMPathInfoWithFSO omPathInfo,
long transactionLogIndex, long objectID) {
-
OmKeyInfo.Builder builder = new OmKeyInfo.Builder();
builder.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
index 5746937..39f4057 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.ozone.audit.OMAction;
import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneConfigUtil;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -164,9 +165,14 @@ public class S3InitiateMultipartUploadRequest extends
OMKeyRequest {
// also like this, even when key exists in a bucket, user can still
// initiate MPU.
- final ReplicationConfig replicationConfig =
- ReplicationConfig.fromTypeAndFactor(
- keyArgs.getType(), keyArgs.getFactor());
+ final OmBucketInfo bucketInfo = omMetadataManager.getBucketTable()
+ .get(omMetadataManager.getBucketKey(volumeName, bucketName));
+ final ReplicationConfig replicationConfig = OzoneConfigUtil
+ .resolveReplicationConfigPreference(keyArgs.getType(),
+ keyArgs.getFactor(), keyArgs.getEcReplicationConfig(),
+ bucketInfo != null ?
+ bucketInfo.getDefaultReplicationConfig() :
+ null, ozoneManager.getDefaultReplicationConfig());
multipartKeyInfo = new OmMultipartKeyInfo.Builder()
.setUploadID(keyArgs.getMultipartUploadID())
@@ -177,9 +183,6 @@ public class S3InitiateMultipartUploadRequest extends
OMKeyRequest {
.setUpdateID(transactionLogIndex)
.build();
- OmBucketInfo bucketInfo = omMetadataManager.getBucketTable().get(
- omMetadataManager.getBucketKey(volumeName, bucketName));
-
omKeyInfo = new OmKeyInfo.Builder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestWithFSO.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestWithFSO.java
index da13536..90c44ed 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestWithFSO.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestWithFSO.java
@@ -24,8 +24,10 @@ import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneConfigUtil;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
@@ -160,9 +162,14 @@ public class S3InitiateMultipartUploadRequestWithFSO
// care of in the final complete multipart upload. AWS S3 behavior is
// also like this, even when key exists in a bucket, user can still
// initiate MPU.
- final ReplicationConfig replicationConfig =
- ReplicationConfig.fromTypeAndFactor(
- keyArgs.getType(), keyArgs.getFactor());
+ final OmBucketInfo bucketInfo = omMetadataManager.getBucketTable()
+ .get(omMetadataManager.getBucketKey(volumeName, bucketName));
+ final ReplicationConfig replicationConfig = OzoneConfigUtil
+ .resolveReplicationConfigPreference(keyArgs.getType(),
+ keyArgs.getFactor(), keyArgs.getEcReplicationConfig(),
+ bucketInfo != null ?
+ bucketInfo.getDefaultReplicationConfig() :
+ null, ozoneManager.getDefaultReplicationConfig());
multipartKeyInfo = new OmMultipartKeyInfo.Builder()
.setUploadID(keyArgs.getMultipartUploadID())
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
index 93b7787..94107c4 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.ozone.om.OzoneManagerPrepareState;
import org.apache.hadoop.ozone.om.ResolvedBucket;
import org.apache.hadoop.ozone.om.KeyManager;
import org.apache.hadoop.ozone.om.KeyManagerImpl;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
import org.apache.hadoop.ozone.om.request.OMClientRequest;
@@ -134,6 +135,8 @@ public class TestOMKeyRequest {
when(ozoneManager.isAdmin(any(String.class))).thenReturn(true);
when(ozoneManager.isAdmin(any(UserGroupInformation.class)))
.thenReturn(true);
+ when(ozoneManager.getBucketInfo(anyString(), anyString())).thenReturn(
+ new
OmBucketInfo.Builder().setVolumeName("").setBucketName("").build());
Mockito.doNothing().when(auditLogger).logWrite(any(AuditMessage.class));
scmClient = Mockito.mock(ScmClient.class);
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
index 5fe2c0d..aa1230f 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
@@ -22,6 +22,8 @@ package org.apache.hadoop.ozone.om.request.s3.multipart;
import java.io.IOException;
import java.util.List;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.ozone.om.request.OMClientRequest;
import org.junit.After;
import org.junit.Assert;
@@ -47,6 +49,8 @@ import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Part;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_DEFAULT;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_TYPE_DEFAULT;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
@@ -82,6 +86,10 @@ public class TestS3MultipartRequest {
when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
auditLogger = Mockito.mock(AuditLogger.class);
when(ozoneManager.getAuditLogger()).thenReturn(auditLogger);
+ when(ozoneManager.getDefaultReplicationConfig()).thenReturn(
+ ReplicationConfig.fromTypeAndString(ReplicationType
+ .valueOf(OZONE_SERVER_DEFAULT_REPLICATION_TYPE_DEFAULT),
+ OZONE_SERVER_DEFAULT_REPLICATION_DEFAULT));
Mockito.doNothing().when(auditLogger).logWrite(any(AuditMessage.class));
when(ozoneManager.resolveBucketLink(any(KeyArgs.class),
any(OMClientRequest.class)))
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java
index 8d98882..65f2d5a 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java
@@ -21,6 +21,8 @@ import com.google.common.base.Optional;
import com.google.common.collect.Iterators;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -69,6 +71,8 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_DEFAULT;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_TYPE_DEFAULT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@@ -134,6 +138,10 @@ public class TestCleanupTableInfo {
);
when(om.getAclsEnabled()).thenReturn(false);
when(om.getAuditLogger()).thenReturn(mock(AuditLogger.class));
+ when(om.getDefaultReplicationConfig()).thenReturn(ReplicationConfig
+ .fromTypeAndString(ReplicationType
+ .valueOf(OZONE_SERVER_DEFAULT_REPLICATION_TYPE_DEFAULT),
+ OZONE_SERVER_DEFAULT_REPLICATION_DEFAULT));
addVolumeToMetaTable(aVolumeArgs());
addBucketToMetaTable(aBucketInfo());
}
diff --git
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
index 6bcb689..e6e664e 100644
---
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
+++
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
@@ -160,6 +160,12 @@ public class BasicOzoneClientAdapterImpl implements
OzoneClientAdapter {
@Override
public short getDefaultReplication() {
+ if (replicationConfig == null) {
+ // to provide backward compatibility, we are just retuning 3;
+ // However we need to handle with the correct behavior.
+ // TODO: Please see HDDS-5646
+ return (short) ReplicationFactor.THREE.getValue();
+ }
return (short) replicationConfig.getRequiredNodes();
}
@@ -194,19 +200,24 @@ public class BasicOzoneClientAdapterImpl implements
OzoneClientAdapter {
incrementCounter(Statistic.OBJECTS_CREATED, 1);
try {
OzoneOutputStream ozoneOutputStream = null;
+ ReplicationConfig replConfig = this.replicationConfig;
+ // Since the bucket has the right default replication, we are using it.
+ if (bucket.getReplicationConfig() != null) {
+ replConfig = bucket.getReplicationConfig();
+ }
if (replication == ReplicationFactor.ONE.getValue()
|| replication == ReplicationFactor.THREE.getValue()) {
ReplicationConfig customReplicationConfig =
ReplicationConfig.adjustReplication(
- replicationConfig, replication
+ replConfig, replication
);
ozoneOutputStream =
bucket.createFile(key, 0, customReplicationConfig, overWrite,
recursive);
} else {
ozoneOutputStream =
- bucket.createFile(key, 0, replicationConfig, overWrite, recursive);
+ bucket.createFile(key, 0, replConfig, overWrite, recursive);
}
return new OzoneFSOutputStream(ozoneOutputStream.getOutputStream());
} catch (OMException ex) {
diff --git
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
index 1f8f25e..ee17992 100644
---
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
+++
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
@@ -266,6 +266,12 @@ public class BasicRootedOzoneClientAdapterImpl
@Override
public short getDefaultReplication() {
+ if (replicationConfig == null) {
+ // to provide backward compatibility, we are just retuning 3;
+ // However we need to handle with the correct behavior.
+ // TODO: Please see HDDS-5646
+ return (short) ReplicationFactor.THREE.getValue();
+ }
return (short) replicationConfig.getRequiredNodes();
}
@@ -309,16 +315,21 @@ public class BasicRootedOzoneClientAdapterImpl
try {
// Hadoop CopyCommands class always sets recursive to true
OzoneBucket bucket = getBucket(ofsPath, recursive);
+ ReplicationConfig replConfig = this.replicationConfig;
+ // Since the bucket has the right default replication, we are using it.
+ if (bucket.getReplicationConfig() != null) {
+ replConfig = bucket.getReplicationConfig();
+ }
OzoneOutputStream ozoneOutputStream = null;
if (replication == ReplicationFactor.ONE.getValue()
|| replication == ReplicationFactor.THREE.getValue()) {
ozoneOutputStream = bucket.createFile(key, 0,
- ReplicationConfig.adjustReplication(replicationConfig,
replication),
+ ReplicationConfig.adjustReplication(replConfig, replication),
overWrite, recursive);
} else {
ozoneOutputStream =
- bucket.createFile(key, 0, replicationConfig, overWrite, recursive);
+ bucket.createFile(key, 0, replConfig, overWrite, recursive);
}
return new OzoneFSOutputStream(ozoneOutputStream.getOutputStream());
} catch (OMException ex) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]