This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new d6dc278  EC: Make ECReplicationConfig stored as bucket level 
attributes. (#2401)
d6dc278 is described below

commit d6dc2781952bfb0e971668ca36308064fc8e5314
Author: Uma Maheswara Rao G <[email protected]>
AuthorDate: Thu Aug 26 08:41:07 2021 -0700

    EC: Make ECReplicationConfig stored as bucket level attributes. (#2401)
---
 .../hdds/client/DefaultReplicationConfig.java      | 110 +++++++++++++++++++++
 .../hadoop/hdds/client/ReplicationFactor.java      |  16 +++
 .../apache/hadoop/hdds/client/ReplicationType.java |  20 ++++
 .../common/src/main/resources/ozone-default.xml    |  14 +--
 .../hadoop/hdds/client/TestReplicationConfig.java  |   7 +-
 .../interface-client/src/main/proto/hdds.proto     |   8 ++
 .../scm/pipeline/TestRatisPipelineProvider.java    |   6 ++
 .../org/apache/hadoop/ozone/client/BucketArgs.java |  26 ++++-
 .../apache/hadoop/ozone/client/OzoneBucket.java    |  41 +++++++-
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  16 ++-
 .../hadoop/ozone/client/MockOmTransport.java       | 110 +++++++++++++++++----
 .../hadoop/ozone/client/TestOzoneECClient.java     |  39 ++++++--
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |  12 +++
 .../hadoop/ozone/om/helpers/OmBucketInfo.java      |  71 +++++++++++--
 .../apache/hadoop/ozone/protocolPB/OMPBHelper.java |  54 ++++++++++
 .../hadoop/ozone/om/helpers/TestOmBucketInfo.java  |  52 ++++++++++
 .../hadoop/ozone/TestOzoneConfigurationFields.java |   6 ++
 .../ozone/client/rpc/TestECKeyOutputStream.java    |  90 ++++++++++++++++-
 .../org/apache/hadoop/ozone/om/TestOmAcls.java     |   9 +-
 .../src/main/proto/OmClientProtocol.proto          |   6 +-
 .../apache/hadoop/ozone/om/OzoneConfigUtil.java    |  60 +++++++++++
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  23 ++++-
 .../om/request/file/OMDirectoryCreateRequest.java  |   7 +-
 .../ozone/om/request/file/OMFileCreateRequest.java |  26 +++--
 .../request/file/OMFileCreateRequestWithFSO.java   |  15 ++-
 .../om/request/key/OMAllocateBlockRequest.java     |   3 +-
 .../ozone/om/request/key/OMKeyCreateRequest.java   |  20 +++-
 .../om/request/key/OMKeyCreateRequestWithFSO.java  |  15 ++-
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  |  16 ++-
 .../S3InitiateMultipartUploadRequest.java          |  15 +--
 .../S3InitiateMultipartUploadRequestWithFSO.java   |  13 ++-
 .../ozone/om/request/key/TestOMKeyRequest.java     |   3 +
 .../s3/multipart/TestS3MultipartRequest.java       |   8 ++
 .../ozone/om/response/TestCleanupTableInfo.java    |   8 ++
 .../fs/ozone/BasicOzoneClientAdapterImpl.java      |  15 ++-
 .../ozone/BasicRootedOzoneClientAdapterImpl.java   |  15 ++-
 36 files changed, 872 insertions(+), 103 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/DefaultReplicationConfig.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/DefaultReplicationConfig.java
new file mode 100644
index 0000000..a11b7fc
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/DefaultReplicationConfig.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.client;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+
+import java.util.Objects;
+
+/**
+ * Replication configuration for EC replication.
+ */
+public class DefaultReplicationConfig {
+
+  private ReplicationType type;
+  private ReplicationFactor factor;
+  private ECReplicationConfig ecReplicationConfig;
+
+  public DefaultReplicationConfig(ReplicationType type,
+      ReplicationFactor factor) {
+    this.type = type;
+    this.factor = factor;
+    this.ecReplicationConfig = null;
+  }
+
+  public DefaultReplicationConfig(ReplicationType type,
+      ECReplicationConfig ecReplicationConfig) {
+    this.type = type;
+    this.factor = null;
+    this.ecReplicationConfig = ecReplicationConfig;
+  }
+
+  public DefaultReplicationConfig(ReplicationType type,
+      ReplicationFactor factor, ECReplicationConfig ecReplicationConfig) {
+    this.type = type;
+    this.factor = factor;
+    this.ecReplicationConfig = ecReplicationConfig;
+  }
+
+  public ReplicationType getType() {
+    return this.type;
+  }
+
+  public ReplicationFactor getFactor() {
+    return this.factor;
+  }
+
+  public DefaultReplicationConfig copy() {
+    return new DefaultReplicationConfig(this.type, this.factor,
+        this.ecReplicationConfig);
+  }
+
+  public ECReplicationConfig getEcReplicationConfig() {
+    return this.ecReplicationConfig;
+  }
+
+  public int getRequiredNodes() {
+    if(this.type == ReplicationType.EC){
+      return ecReplicationConfig.getRequiredNodes();
+    }
+    return this.factor.getValue();
+  }
+
+  public HddsProtos.DefaultReplicationConfig toProto() {
+    final HddsProtos.DefaultReplicationConfig.Builder builder =
+        HddsProtos.DefaultReplicationConfig.newBuilder()
+            .setType(ReplicationType.toProto(this.type));
+    if (this.factor != null) {
+      builder.setFactor(ReplicationFactor.toProto(this.factor));
+    }
+    if (this.ecReplicationConfig != null) {
+      builder.setEcReplicationConfig(this.ecReplicationConfig.toProto());
+    }
+    return builder.build();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    DefaultReplicationConfig that = (DefaultReplicationConfig) o;
+    return Objects.equals(type, that.type) && Objects
+        .equals(factor, that.factor) && Objects
+        .equals(ecReplicationConfig, ecReplicationConfig);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(type, factor, ecReplicationConfig);
+  }
+}
+
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationFactor.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationFactor.java
index 044bd6f..6cb16a4 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationFactor.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationFactor.java
@@ -71,6 +71,22 @@ public enum ReplicationFactor {
     }
   }
 
+  public static HddsProtos.ReplicationFactor toProto(
+       ReplicationFactor replicationFactor) {
+    if (replicationFactor == null) {
+      return null;
+    }
+    switch (replicationFactor) {
+    case ONE:
+      return HddsProtos.ReplicationFactor.ONE;
+    case THREE:
+      return HddsProtos.ReplicationFactor.THREE;
+    default:
+      throw new IllegalArgumentException(
+          "Unsupported ProtoBuf replication factor: " + replicationFactor);
+    }
+  }
+
   /**
    * Returns integer representation of ReplicationFactor.
    * @return replication value
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationType.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationType.java
index 83f04bb..64969ea 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationType.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/ReplicationType.java
@@ -48,4 +48,24 @@ public enum ReplicationType {
           "Unsupported ProtoBuf replication type: " + replicationType);
     }
   }
+
+  public static HddsProtos.ReplicationType toProto(
+       ReplicationType replicationType) {
+    if (replicationType == null) {
+      return null;
+    }
+    switch (replicationType) {
+    case RATIS:
+      return HddsProtos.ReplicationType.RATIS;
+    case STAND_ALONE:
+      return HddsProtos.ReplicationType.STAND_ALONE;
+    case CHAINED:
+      return HddsProtos.ReplicationType.CHAINED;
+    case EC:
+      return HddsProtos.ReplicationType.EC;
+    default:
+      throw new IllegalArgumentException(
+          "Unsupported replication type: " + replicationType);
+    }
+  }
 }
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index eacd12a..c26655f 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1173,24 +1173,26 @@
   </property>
 
   <property>
-    <name>ozone.replication</name>
+    <name>ozone.server.default.replication</name>
     <value>3</value>
-    <tag>OZONE, CLIENT</tag>
+    <tag>OZONE</tag>
     <description>
       Default replication value. The actual number of replications can be
       specified when writing the key. The default is used if replication
-      is not specified. Supported values: 1 and 3.
+      is not specified when creating key or no default replication set at
+      bucket. Supported values: 1, 3 and EC_3_2.
     </description>
   </property>
 
   <property>
-    <name>ozone.replication.type</name>
+    <name>ozone.server.default.replication.type</name>
     <value>RATIS</value>
-    <tag>OZONE, CLIENT</tag>
+    <tag>OZONE</tag>
     <description>
       Default replication type to be used while writing key into ozone. The
       value can be specified when writing the key, default is used when
-      nothing is specified. Supported values: RATIS, STAND_ALONE and CHAINED.
+      nothing is specified when creating key or no default value set at bucket.
+      Supported values: RATIS, STAND_ALONE, CHAINED and EC.
     </description>
   </property>
   <property>
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/client/TestReplicationConfig.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/client/TestReplicationConfig.java
index 1c75d48..a85415e 100644
--- 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/client/TestReplicationConfig.java
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/client/TestReplicationConfig.java
@@ -30,13 +30,10 @@ import org.junit.Test;
 public class TestReplicationConfig {
 
   @Test
-  public void testGetDefaultShouldCreateReplicationConfigFromDefaultConf() {
+  public void testGetDefaultShouldReturnNullIfNotSetClientSide() {
     OzoneConfiguration conf = new OzoneConfiguration();
     ReplicationConfig replicationConfig = ReplicationConfig.getDefault(conf);
-    Assert.assertEquals(
-        org.apache.hadoop.hdds.client.ReplicationType.RATIS.name(),
-        replicationConfig.getReplicationType().name());
-    Assert.assertEquals(3, replicationConfig.getRequiredNodes());
+    Assert.assertNull(replicationConfig);
   }
 
   @Test
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto 
b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index 34d7fec..595fdd6 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -261,11 +261,13 @@ enum ReplicationType {
     STAND_ALONE = 2;
     CHAINED = 3;
     EC = 4;
+    NONE = -1; // Invalid Type
 }
 
 enum ReplicationFactor {
     ONE = 1;
     THREE = 3;
+    ZERO = 0; // Invalid Factor
 }
 
 message ECReplicationConfig {
@@ -273,6 +275,12 @@ message ECReplicationConfig {
     required int32 parity = 2;
 }
 
+message DefaultReplicationConfig {
+    required ReplicationType type = 1;
+    optional ReplicationFactor factor = 2;
+    optional ECReplicationConfig ecReplicationConfig = 3;
+}
+
 enum ScmOps {
     allocateBlock = 1;
     keyBlocksInfoList = 2;
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index b5fd0a7..f547116 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -263,6 +263,9 @@ public class TestRatisPipelineProvider {
     largeContainerConf.set(OZONE_SCM_CONTAINER_SIZE, "100TB");
     init(1, largeContainerConf);
     for (ReplicationFactor factor: ReplicationFactor.values()) {
+      if (factor == ReplicationFactor.ZERO) {
+        continue;
+      }
       try {
         provider.create(new RatisReplicationConfig(factor));
         Assert.fail("Expected SCMException for large container size with " +
@@ -276,6 +279,9 @@ public class TestRatisPipelineProvider {
     largeMetadataConf.set(OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN, "100TB");
     init(1, largeMetadataConf);
     for (ReplicationFactor factor: ReplicationFactor.values()) {
+      if (factor == ReplicationFactor.ZERO) {
+        continue;
+      }
       try {
         provider.create(new RatisReplicationConfig(factor));
         Assert.fail("Expected SCMException for large metadata size with " +
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/BucketArgs.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/BucketArgs.java
index 7227034..34d599e 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/BucketArgs.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/BucketArgs.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.client;
 
+import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
 import org.apache.hadoop.hdds.protocol.StorageType;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -56,6 +57,7 @@ public final class BucketArgs {
    * Bucket encryption key name.
    */
   private String bucketEncryptionKey;
+  private DefaultReplicationConfig defaultReplicationConfig;
   private final String sourceVolume;
   private final String sourceBucket;
 
@@ -78,13 +80,15 @@ public final class BucketArgs {
    * @param sourceBucket
    * @param quotaInBytes Bucket quota in bytes.
    * @param quotaInNamespace Bucket quota in counts.
-   * @param bucketLayout Bucket Layouts.
+   * @param bucketLayout bucket layout.
+   * @param defaultReplicationConfig default replication config.
    */
   @SuppressWarnings("parameternumber")
   private BucketArgs(Boolean versioning, StorageType storageType,
       List<OzoneAcl> acls, Map<String, String> metadata,
       String bucketEncryptionKey, String sourceVolume, String sourceBucket,
-      long quotaInBytes, long quotaInNamespace, BucketLayout bucketLayout) {
+      long quotaInBytes, long quotaInNamespace, BucketLayout bucketLayout,
+      DefaultReplicationConfig defaultReplicationConfig) {
     this.acls = acls;
     this.versioning = versioning;
     this.storageType = storageType;
@@ -95,6 +99,7 @@ public final class BucketArgs {
     this.quotaInBytes = quotaInBytes;
     this.quotaInNamespace = quotaInNamespace;
     this.bucketLayout = bucketLayout;
+    this.defaultReplicationConfig = defaultReplicationConfig;
   }
 
   /**
@@ -139,6 +144,14 @@ public final class BucketArgs {
   }
 
   /**
+   * Returns the bucket default replication config.
+   * @return bucket's default Replication Config.
+   */
+  public DefaultReplicationConfig getDefaultReplicationConfig() {
+    return this.defaultReplicationConfig;
+  }
+
+  /**
    * Returns new builder class that builds a OmBucketInfo.
    *
    * @return Builder
@@ -192,6 +205,7 @@ public final class BucketArgs {
     private long quotaInBytes;
     private long quotaInNamespace;
     private BucketLayout bucketLayout;
+    private DefaultReplicationConfig defaultReplicationConfig;
 
     public Builder() {
       metadata = new HashMap<>();
@@ -249,6 +263,12 @@ public final class BucketArgs {
       return this;
     }
 
+    public BucketArgs.Builder setDefaultReplicationConfig(
+        DefaultReplicationConfig defaultReplConfig) {
+      defaultReplicationConfig = defaultReplConfig;
+      return this;
+    }
+
 
     /**
      * Constructs the BucketArgs.
@@ -257,7 +277,7 @@ public final class BucketArgs {
     public BucketArgs build() {
       return new BucketArgs(versioning, storageType, acls, metadata,
           bucketEncryptionKey, sourceVolume, sourceBucket, quotaInBytes,
-          quotaInNamespace, bucketLayout);
+          quotaInNamespace, bucketLayout, defaultReplicationConfig);
     }
   }
 }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
index 8a8cde9..575612d 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
 import org.apache.hadoop.hdds.client.OzoneQuota;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -30,6 +31,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
@@ -80,7 +82,7 @@ public class OzoneBucket extends WithMetadata {
   /**
    * Default replication factor to be used while creating keys.
    */
-  private final ReplicationConfig defaultReplication;
+  private ReplicationConfig defaultReplication;
 
   /**
    * Type of storage to be used for this bucket.
@@ -221,6 +223,39 @@ public class OzoneBucket extends WithMetadata {
     this.bucketLayout = bucketLayout;
   }
 
+  @SuppressWarnings("parameternumber")
+  public OzoneBucket(ConfigurationSource conf, ClientProtocol proxy,
+      String volumeName, String bucketName, StorageType storageType,
+      Boolean versioning, long creationTime, long modificationTime,
+      Map<String, String> metadata, String encryptionKeyName,
+      String sourceVolume, String sourceBucket, long usedBytes,
+      long usedNamespace, long quotaInBytes, long quotaInNamespace,
+      BucketLayout bucketLayout,
+      DefaultReplicationConfig defaultReplicationConfig) {
+    this(conf, proxy, volumeName, bucketName, storageType, versioning,
+        creationTime, modificationTime, metadata, encryptionKeyName,
+        sourceVolume, sourceBucket, usedBytes, usedNamespace, quotaInBytes,
+        quotaInNamespace);
+    this.bucketLayout = bucketLayout;
+    if (defaultReplicationConfig != null) {
+      this.defaultReplication =
+          defaultReplicationConfig.getType() == ReplicationType.EC ?
+              defaultReplicationConfig.getEcReplicationConfig() :
+              ReplicationConfig
+                  .fromTypeAndFactor(defaultReplicationConfig.getType(),
+                      defaultReplicationConfig.getFactor());
+    } else {
+      // This can happen when talk to old server. So, using old client side
+      // defaults.
+      this.defaultReplication = ReplicationConfig.fromTypeAndString(
+          ReplicationType.valueOf(
+              conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE,
+                  OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT)),
+          conf.get(OzoneConfigKeys.OZONE_REPLICATION,
+              OzoneConfigKeys.OZONE_REPLICATION_DEFAULT));
+    }
+  }
+
   /**
    * Constructs OzoneBucket instance.
    * @param conf Configuration object.
@@ -1219,4 +1254,8 @@ public class OzoneBucket extends WithMetadata {
   public BucketLayout getBucketLayout() {
     return bucketLayout;
   }
+
+  public ReplicationConfig getReplicationConfig(){
+    return this.defaultReplication;
+  }
 }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index c5c9899..f0f5dee 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.crypto.CryptoInputStream;
 import org.apache.hadoop.crypto.CryptoOutputStream;
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
@@ -500,6 +501,12 @@ public class RpcClient implements ClientProtocol {
       builder.setBucketEncryptionKey(bek);
     }
 
+    DefaultReplicationConfig defaultReplicationConfig =
+        bucketArgs.getDefaultReplicationConfig();
+    if (defaultReplicationConfig != null) {
+      builder.setDefaultReplicationConfig(defaultReplicationConfig);
+    }
+
     LOG.info("Creating Bucket: {}/{}, with Versioning {} and " +
             "Storage Type set to {} and Encryption set to {} ",
         volumeName, bucketName, isVersionEnabled, storageType, bek != null);
@@ -718,7 +725,8 @@ public class RpcClient implements ClientProtocol {
         bucketInfo.getUsedNamespace(),
         bucketInfo.getQuotaInBytes(),
         bucketInfo.getQuotaInNamespace(),
-        bucketInfo.getBucketLayout()
+        bucketInfo.getBucketLayout(),
+        bucketInfo.getDefaultReplicationConfig()
     );
   }
 
@@ -771,7 +779,7 @@ public class RpcClient implements ClientProtocol {
     if (checkKeyNameEnabled) {
       HddsClientUtils.verifyKeyName(keyName);
     }
-    HddsClientUtils.checkNotNull(keyName, replicationConfig);
+    HddsClientUtils.checkNotNull(keyName);
     String requestId = UUID.randomUUID().toString();
 
     OmKeyArgs.Builder builder = new OmKeyArgs.Builder()
@@ -1394,14 +1402,14 @@ public class RpcClient implements ClientProtocol {
       keyOutputStream = new ECKeyOutputStream.Builder().setHandler(openKey)
           .setXceiverClientManager(xceiverClientManager)
           .setOmClient(ozoneManagerClient).setRequestID(requestId)
-          .setReplicationConfig(replicationConfig)
+          .setReplicationConfig(openKey.getKeyInfo().getReplicationConfig())
           .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
           .setConfig(clientConfig).build();
     } else {
       keyOutputStream = new KeyOutputStream.Builder().setHandler(openKey)
           .setXceiverClientManager(xceiverClientManager)
           .setOmClient(ozoneManagerClient).setRequestID(requestId)
-          .setReplicationConfig(replicationConfig)
+          .setReplicationConfig(openKey.getKeyInfo().getReplicationConfig())
           .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
           .setConfig(clientConfig).build();
     }
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
index f38ee35..1da0c2d 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockOmTransport.java
@@ -17,6 +17,12 @@
  */
 package org.apache.hadoop.ozone.client;
 
+import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
@@ -53,6 +59,9 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.function.Function;
 
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_TYPE_DEFAULT;
+
 /**
  * OM transport for testing with in-memory state.
  */
@@ -165,33 +174,96 @@ public class MockOmTransport implements OmTransport {
   private CreateKeyResponse createKey(CreateKeyRequest createKeyRequest) {
     final KeyArgs keyArgs = createKeyRequest.getKeyArgs();
     final long now = System.currentTimeMillis();
-    final KeyInfo keyInfo = KeyInfo.newBuilder()
-        .setVolumeName(keyArgs.getVolumeName())
-        .setBucketName(keyArgs.getBucketName())
-        .setKeyName(keyArgs.getKeyName())
-        .setCreationTime(now)
-        .setModificationTime(now)
-        .setType(keyArgs.getType())
-        .setFactor(keyArgs.getFactor())
-        .setDataSize(keyArgs.getDataSize())
-        .setLatestVersion(0L)
-        .addKeyLocationList(KeyLocationList.newBuilder()
-            .addAllKeyLocations(
+    final BucketInfo bucketInfo =
+        buckets.get(keyArgs.getVolumeName()).get(keyArgs.getBucketName());
+
+    final KeyInfo.Builder keyInfoBuilder =
+        KeyInfo.newBuilder().setVolumeName(keyArgs.getVolumeName())
+            .setBucketName(keyArgs.getBucketName())
+            .setKeyName(keyArgs.getKeyName()).setCreationTime(now)
+            .setModificationTime(now).setDataSize(keyArgs.getDataSize())
+            .setLatestVersion(0L).addKeyLocationList(
+            KeyLocationList.newBuilder().addAllKeyLocations(
                 blockAllocator.allocateBlock(createKeyRequest.getKeyArgs()))
-            .build())
-        .build();
+                .build());
+
+    if (keyArgs.getType() == HddsProtos.ReplicationType.NONE) {
+      // 1. Client did not pass replication config.
+      // Now lets try bucket defaults
+      if (bucketInfo.getDefaultReplicationConfig() != null) {
+        // Since Bucket defaults are available, let's inherit
+        final HddsProtos.ReplicationType type =
+            bucketInfo.getDefaultReplicationConfig().getType();
+        keyInfoBuilder
+            .setType(bucketInfo.getDefaultReplicationConfig().getType());
+        switch (type) {
+        case EC:
+          keyInfoBuilder.setEcReplicationConfig(
+              bucketInfo.getDefaultReplicationConfig()
+                  .getEcReplicationConfig());
+          break;
+        case RATIS:
+        case STAND_ALONE:
+          keyInfoBuilder
+              .setFactor(bucketInfo.getDefaultReplicationConfig().getFactor());
+          break;
+        default:
+          throw new UnsupportedOperationException(
+              "Unknown replication type: " + type);
+        }
+      } else {
+        keyInfoBuilder.setType(HddsProtos.ReplicationType.RATIS);
+        keyInfoBuilder.setFactor(HddsProtos.ReplicationFactor.THREE);
+      }
+    } else {
+      // 1. Client passed the replication config.
+      // Let's use it.
+      final HddsProtos.ReplicationType type = keyArgs.getType();
+      keyInfoBuilder.setType(type);
+      switch (type) {
+      case EC:
+        
keyInfoBuilder.setEcReplicationConfig(keyArgs.getEcReplicationConfig());
+        break;
+      case RATIS:
+      case STAND_ALONE:
+        keyInfoBuilder.setFactor(keyArgs.getFactor());
+        break;
+      default:
+        throw new UnsupportedOperationException(
+            "Unknown replication type: " + type);
+      }
+    }
+
+    final KeyInfo keyInfo = keyInfoBuilder.build();
     openKeys.get(keyInfo.getVolumeName()).get(keyInfo.getBucketName())
         .put(keyInfo.getKeyName(), keyInfo);
-    return CreateKeyResponse.newBuilder()
-        .setOpenVersion(0L)
-        .setKeyInfo(keyInfo)
+    return 
CreateKeyResponse.newBuilder().setOpenVersion(0L).setKeyInfo(keyInfo)
         .build();
   }
 
   private InfoBucketResponse infoBucket(InfoBucketRequest infoBucketRequest) {
+    BucketInfo bucketInfo = buckets.get(infoBucketRequest.getVolumeName())
+        .get(infoBucketRequest.getBucketName());
+    if(!bucketInfo.hasDefaultReplicationConfig()) {
+      final ReplicationConfig replicationConfig = ReplicationConfig
+          .fromTypeAndString(ReplicationType
+                  .valueOf(OZONE_SERVER_DEFAULT_REPLICATION_TYPE_DEFAULT),
+              OZONE_SERVER_DEFAULT_REPLICATION_DEFAULT);
+
+      bucketInfo = bucketInfo.toBuilder().setDefaultReplicationConfig(
+          new DefaultReplicationConfig(
+              
ReplicationType.fromProto(replicationConfig.getReplicationType()),
+              replicationConfig
+                  .getReplicationType() != HddsProtos.ReplicationType.EC ?
+                  ReplicationFactor
+                      .valueOf(replicationConfig.getRequiredNodes()) :
+                  null, replicationConfig
+              .getReplicationType() == HddsProtos.ReplicationType.EC ?
+              (ECReplicationConfig) replicationConfig :
+              null).toProto()).build();
+    }
     return InfoBucketResponse.newBuilder()
-        .setBucketInfo(buckets.get(infoBucketRequest.getVolumeName())
-            .get(infoBucketRequest.getBucketName()))
+        .setBucketInfo(bucketInfo)
         .build();
   }
 
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
index 56e7d34..70d4406 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneECClient.java
@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.ozone.client;
 
+import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.InMemoryConfiguration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -31,6 +33,7 @@ import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.io.erasurecode.ErasureCodecOptions;
 import org.apache.hadoop.io.erasurecode.codec.RSErasureCodec;
 import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.rpc.RpcClient;
@@ -122,7 +125,7 @@ public class TestOzoneECClient {
 
   @Test
   public void testPutECKeyAndCheckDNStoredData() throws IOException {
-    OzoneBucket bucket = writeIntoECKey(inputChunks, keyName);
+    OzoneBucket bucket = writeIntoECKey(inputChunks, keyName, null);
     OzoneKey key = bucket.getKey(keyName);
     Assert.assertEquals(keyName, key.getName());
     Map<DatanodeDetails, MockDatanodeStorage> storages =
@@ -142,7 +145,7 @@ public class TestOzoneECClient {
 
   @Test
   public void testPutECKeyAndCheckParityData() throws IOException {
-    OzoneBucket bucket = writeIntoECKey(inputChunks, keyName);
+    OzoneBucket bucket = writeIntoECKey(inputChunks, keyName, null);
     final ByteBuffer[] dataBuffers = new ByteBuffer[3];
     for (int i = 0; i < inputChunks.length; i++) {
       dataBuffers[i] = ByteBuffer.wrap(inputChunks[i]);
@@ -174,7 +177,7 @@ public class TestOzoneECClient {
 
   @Test
   public void testPutECKeyAndReadContent() throws IOException {
-    OzoneBucket bucket = writeIntoECKey(inputChunks, keyName);
+    OzoneBucket bucket = writeIntoECKey(inputChunks, keyName, null);
     OzoneKey key = bucket.getKey(keyName);
     Assert.assertEquals(keyName, key.getName());
     try (OzoneInputStream is = bucket.readKey(keyName)) {
@@ -205,17 +208,41 @@ public class TestOzoneECClient {
     }
   }
 
-  private OzoneBucket writeIntoECKey(byte[][] chunks, String key)
+  @Test
+  public void testCreateBucketWithDefaultReplicationConfig()
       throws IOException {
+    final OzoneBucket bucket = writeIntoECKey(inputChunks, keyName,
+        new DefaultReplicationConfig(ReplicationType.EC,
+            new ECReplicationConfig(dataBlocks, parityBlocks)));
+
+    // create key without mentioning replication config. Since we set EC
+    // replication in bucket, key should be EC key.
+    try (OzoneOutputStream out = bucket.createKey("mykey", 2000)) {
+      Assert.assertTrue(out.getOutputStream() instanceof ECKeyOutputStream);
+      for (int i = 0; i < inputChunks.length; i++) {
+        out.write(inputChunks[i]);
+      }
+    }
+  }
+
+
+  private OzoneBucket writeIntoECKey(byte[][] chunks, String key,
+      DefaultReplicationConfig defaultReplicationConfig) throws IOException {
     String volumeName = UUID.randomUUID().toString();
     String bucketName = UUID.randomUUID().toString();
     store.createVolume(volumeName);
     OzoneVolume volume = store.getVolume(volumeName);
-    volume.createBucket(bucketName);
+    if (defaultReplicationConfig != null) {
+      final BucketArgs.Builder builder = BucketArgs.newBuilder();
+      builder.setDefaultReplicationConfig(defaultReplicationConfig);
+      volume.createBucket(bucketName, builder.build());
+    } else {
+      volume.createBucket(bucketName);
+    }
     OzoneBucket bucket = volume.getBucket(bucketName);
 
     try (OzoneOutputStream out = bucket.createKey(key, 2000,
-        new ECReplicationConfig(dataBlocks, parityBlocks), new HashMap<>())) {
+        new ECReplicationConfig(3, 2), new HashMap<>())) {
       for (int i = 0; i < chunks.length; i++) {
         out.write(chunks[i]);
       }
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 563f99c..751b9c9 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.om;
 
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.ratis.util.TimeDuration;
 
 /**
@@ -240,6 +242,16 @@ public final class OMConfigKeys {
   public static final boolean OZONE_OM_ENABLE_FILESYSTEM_PATHS_DEFAULT =
       false;
 
+  public static final String OZONE_SERVER_DEFAULT_REPLICATION_KEY =
+      "ozone.server.default.replication";
+  public static final String OZONE_SERVER_DEFAULT_REPLICATION_DEFAULT =
+      ReplicationFactor.THREE.toString();
+
+  public static final String OZONE_SERVER_DEFAULT_REPLICATION_TYPE_KEY =
+      "ozone.server.default.replication.type";
+  public static final String OZONE_SERVER_DEFAULT_REPLICATION_TYPE_DEFAULT =
+      ReplicationType.RATIS.toString();
+
   public static final String OZONE_OM_HA_PREFIX = "ozone.om.ha";
 
   public static final String OZONE_FS_TRASH_INTERVAL_KEY =
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
index b1b1102..a1da197 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmBucketInfo.java
@@ -27,7 +27,13 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
+import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.protocol.StorageType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.audit.Auditable;
@@ -75,6 +81,11 @@ public final class OmBucketInfo extends WithObjectID 
implements Auditable {
    */
   private BucketEncryptionKeyInfo bekInfo;
 
+  /**
+   * Optional default replication for bucket.
+   */
+  private DefaultReplicationConfig defaultReplicationConfig;
+
   private final String sourceVolume;
 
   private final String sourceBucket;
@@ -107,7 +118,8 @@ public final class OmBucketInfo extends WithObjectID 
implements Auditable {
    * @param usedBytes - Bucket Quota Usage in bytes.
    * @param quotaInBytes Bucket quota in bytes.
    * @param quotaInNamespace Bucket quota in counts.
-   * @param bucketLayout Bucket Layout.
+   * @param bucketLayout bucket layout.
+   * @param defaultReplicationConfig default replication config.
    */
   @SuppressWarnings("checkstyle:ParameterNumber")
   private OmBucketInfo(String volumeName,
@@ -127,7 +139,8 @@ public final class OmBucketInfo extends WithObjectID 
implements Auditable {
       long usedNamespace,
       long quotaInBytes,
       long quotaInNamespace,
-      BucketLayout bucketLayout) {
+      BucketLayout bucketLayout,
+      DefaultReplicationConfig defaultReplicationConfig) {
     this.volumeName = volumeName;
     this.bucketName = bucketName;
     this.acls = acls;
@@ -146,6 +159,7 @@ public final class OmBucketInfo extends WithObjectID 
implements Auditable {
     this.quotaInBytes = quotaInBytes;
     this.quotaInNamespace = quotaInNamespace;
     this.bucketLayout = bucketLayout;
+    this.defaultReplicationConfig = defaultReplicationConfig;
   }
 
   /**
@@ -245,12 +259,22 @@ public final class OmBucketInfo extends WithObjectID 
implements Auditable {
 
   /**
    * Returns the Bucket Layout.
+   *
    * @return BucketLayout.
    */
   public BucketLayout getBucketLayout() {
     return bucketLayout;
   }
 
+  /**
+   * Returns bucket EC replication config.
+   *
+   * @return EC replication config.
+   */
+  public DefaultReplicationConfig getDefaultReplicationConfig() {
+    return defaultReplicationConfig;
+  }
+
   public String getSourceVolume() {
     return sourceVolume;
   }
@@ -340,6 +364,10 @@ public final class OmBucketInfo extends WithObjectID 
implements Auditable {
         acl.getName(), (BitSet) acl.getAclBitSet().clone(),
         acl.getAclScope())));
 
+    if (defaultReplicationConfig != null) {
+      builder.setDefaultReplicationConfig(defaultReplicationConfig.copy());
+    }
+
     return builder.build();
   }
 
@@ -362,7 +390,21 @@ public final class OmBucketInfo extends WithObjectID 
implements Auditable {
         .setUsedNamespace(usedNamespace)
         .setQuotaInBytes(quotaInBytes)
         .setQuotaInNamespace(quotaInNamespace)
-        .setBucketLayout(bucketLayout);
+        .setBucketLayout(bucketLayout)
+        .setDefaultReplicationConfig(defaultReplicationConfig);
+  }
+
+  public void setDefaultReplicationConfig(ReplicationConfig replicationConfig) 
{
+    this.defaultReplicationConfig = new DefaultReplicationConfig(
+        ReplicationType.fromProto(replicationConfig.getReplicationType()),
+        replicationConfig
+            .getReplicationType() == HddsProtos.ReplicationType.EC ?
+            null :
+            ReplicationFactor.valueOf(replicationConfig.getRequiredNodes()),
+        replicationConfig
+            .getReplicationType() == HddsProtos.ReplicationType.EC ?
+            ((ECReplicationConfig) replicationConfig) :
+            null);
   }
 
   /**
@@ -387,6 +429,7 @@ public final class OmBucketInfo extends WithObjectID 
implements Auditable {
     private long quotaInBytes;
     private long quotaInNamespace;
     private BucketLayout bucketLayout;
+    private DefaultReplicationConfig defaultReplicationConfig;
 
     public Builder() {
       //Default values
@@ -510,6 +553,12 @@ public final class OmBucketInfo extends WithObjectID 
implements Auditable {
       return this;
     }
 
+    public Builder setDefaultReplicationConfig(
+        DefaultReplicationConfig defaultReplConfig) {
+      this.defaultReplicationConfig = defaultReplConfig;
+      return this;
+    }
+
     /**
      * Constructs the OmBucketInfo.
      * @return instance of OmBucketInfo.
@@ -520,11 +569,11 @@ public final class OmBucketInfo extends WithObjectID 
implements Auditable {
       Preconditions.checkNotNull(acls);
       Preconditions.checkNotNull(isVersionEnabled);
       Preconditions.checkNotNull(storageType);
-
       return new OmBucketInfo(volumeName, bucketName, acls, isVersionEnabled,
           storageType, creationTime, modificationTime, objectID, updateID,
           metadata, bekInfo, sourceVolume, sourceBucket, usedBytes,
-          usedNamespace, quotaInBytes, quotaInNamespace, bucketLayout);
+          usedNamespace, quotaInBytes, quotaInNamespace, bucketLayout,
+          defaultReplicationConfig);
     }
   }
 
@@ -553,6 +602,9 @@ public final class OmBucketInfo extends WithObjectID 
implements Auditable {
     if (bekInfo != null && bekInfo.getKeyName() != null) {
       bib.setBeinfo(OMPBHelper.convert(bekInfo));
     }
+    if (defaultReplicationConfig != null) {
+      bib.setDefaultReplicationConfig(defaultReplicationConfig.toProto());
+    }
     if (sourceVolume != null) {
       bib.setSourceVolume(sourceVolume);
     }
@@ -598,6 +650,10 @@ public final class OmBucketInfo extends WithObjectID 
implements Auditable {
       obib.setBucketLayout(
           BucketLayout.fromProto(bucketInfo.getBucketLayout()));
     }
+    if (bucketInfo.hasDefaultReplicationConfig()) {
+      obib.setDefaultReplicationConfig(
+          OMPBHelper.convert(bucketInfo.getDefaultReplicationConfig()));
+    }
     if (bucketInfo.hasObjectID()) {
       obib.setObjectID(bucketInfo.getObjectID());
     }
@@ -637,6 +693,7 @@ public final class OmBucketInfo extends WithObjectID 
implements Auditable {
         ", quotaInBytes='" + quotaInBytes + "'" +
         ", quotaInNamespace='" + quotaInNamespace + "'" +
         ", bucketLayout='" + bucketLayout + '\'' +
+        ", defaultReplicationConfig='" + defaultReplicationConfig + '\'' +
         sourceInfo +
         '}';
   }
@@ -664,7 +721,8 @@ public final class OmBucketInfo extends WithObjectID 
implements Auditable {
         Objects.equals(sourceVolume, that.sourceVolume) &&
         Objects.equals(sourceBucket, that.sourceBucket) &&
         Objects.equals(metadata, that.metadata) &&
-        Objects.equals(bekInfo, that.bekInfo);
+        Objects.equals(bekInfo, that.bekInfo) && Objects
+        .equals(defaultReplicationConfig, this.defaultReplicationConfig);
   }
 
   @Override
@@ -692,6 +750,7 @@ public final class OmBucketInfo extends WithObjectID 
implements Auditable {
         ", quotaInBytes=" + quotaInBytes +
         ", quotaInNamespace=" + quotaInNamespace +
         ", bucketLayout=" + bucketLayout +
+        ", defaultReplicationConfig=" + defaultReplicationConfig +
         '}';
   }
 }
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OMPBHelper.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OMPBHelper.java
index 2ff2dc8..c943341 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OMPBHelper.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/protocolPB/OMPBHelper.java
@@ -21,6 +21,11 @@ import com.google.protobuf.ByteString;
 import org.apache.hadoop.crypto.CipherSuite;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
@@ -141,6 +146,55 @@ public final class OMPBHelper {
         ezKeyVersionName);
   }
 
+  public static DefaultReplicationConfig convert(
+      HddsProtos.DefaultReplicationConfig defaultReplicationConfig) {
+    if (defaultReplicationConfig == null) {
+      throw new IllegalArgumentException(
+          "Invalid argument: default replication config" + " is null");
+    }
+
+    final ReplicationType type =
+        ReplicationType.fromProto(defaultReplicationConfig.getType());
+    DefaultReplicationConfig defaultReplicationConfigObj = null;
+    switch (type) {
+    case EC:
+      defaultReplicationConfigObj = new DefaultReplicationConfig(type,
+          new ECReplicationConfig(
+              defaultReplicationConfig.getEcReplicationConfig()));
+      break;
+    default:
+      final ReplicationFactor factor =
+          ReplicationFactor.fromProto(defaultReplicationConfig.getFactor());
+      defaultReplicationConfigObj = new DefaultReplicationConfig(type, factor);
+    }
+    return defaultReplicationConfigObj;
+  }
+
+  public static HddsProtos.DefaultReplicationConfig convert(
+      DefaultReplicationConfig defaultReplicationConfig) {
+    if (defaultReplicationConfig == null) {
+      throw new IllegalArgumentException(
+          "Invalid argument: default replication config" + " is null");
+    }
+
+    final HddsProtos.DefaultReplicationConfig.Builder builder =
+        HddsProtos.DefaultReplicationConfig.newBuilder();
+    builder.setType(
+        ReplicationType.toProto(defaultReplicationConfig.getType()));
+
+    if (defaultReplicationConfig.getFactor() != null) {
+      builder.setFactor(ReplicationFactor
+          .toProto(defaultReplicationConfig.getFactor()));
+    }
+
+    if (defaultReplicationConfig.getEcReplicationConfig() != null) {
+      builder.setEcReplicationConfig(
+          defaultReplicationConfig.getEcReplicationConfig().toProto());
+    }
+
+    return builder.build();
+  }
+
   public static CipherSuite convert(CipherSuiteProto proto) {
     switch(proto) {
     case AES_CTR_NOPADDING:
diff --git 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmBucketInfo.java
 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmBucketInfo.java
index 650fc91..a742ab9 100644
--- 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmBucketInfo.java
+++ 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmBucketInfo.java
@@ -17,9 +17,13 @@
  */
 package org.apache.hadoop.ozone.om.helpers;
 
+import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.protocol.StorageType;
 
 import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.junit.Assert;
 import org.junit.Test;
@@ -111,4 +115,52 @@ public class TestOmBucketInfo {
     Assert.assertEquals((int) 1, cloneBucketInfo.getAcls().size());
 
   }
+
+  @Test
+  public void getProtobufMessageEC() {
+    OmBucketInfo omBucketInfo =
+        OmBucketInfo.newBuilder().setBucketName("bucket").setVolumeName("vol1")
+            .setCreationTime(Time.now()).setIsVersionEnabled(false)
+            .setStorageType(StorageType.ARCHIVE).setAcls(Collections
+            .singletonList(new OzoneAcl(IAccessAuthorizer.ACLIdentityType.USER,
+                "defaultUser", IAccessAuthorizer.ACLType.WRITE_ACL,
+                OzoneAcl.AclScope.ACCESS))).build();
+    OzoneManagerProtocolProtos.BucketInfo protobuf = 
omBucketInfo.getProtobuf();
+    // No EC Config
+    Assert.assertFalse(protobuf.hasDefaultReplicationConfig());
+
+    // Reconstruct object from Proto
+    OmBucketInfo recovered = OmBucketInfo.getFromProtobuf(protobuf);
+    Assert.assertNull(recovered.getDefaultReplicationConfig());
+
+    // EC Config
+    omBucketInfo =
+        OmBucketInfo.newBuilder().setBucketName("bucket").setVolumeName("vol1")
+            .setCreationTime(Time.now()).setIsVersionEnabled(false)
+            .setStorageType(StorageType.ARCHIVE).setAcls(Collections
+            .singletonList(new OzoneAcl(IAccessAuthorizer.ACLIdentityType.USER,
+                "defaultUser", IAccessAuthorizer.ACLType.WRITE_ACL,
+                OzoneAcl.AclScope.ACCESS))).setDefaultReplicationConfig(
+            new DefaultReplicationConfig(ReplicationType.EC,
+                new ECReplicationConfig(3, 2))).build();
+    protobuf = omBucketInfo.getProtobuf();
+
+    Assert.assertTrue(protobuf.hasDefaultReplicationConfig());
+    Assert.assertEquals(3,
+        protobuf.getDefaultReplicationConfig().getEcReplicationConfig()
+            .getData());
+    Assert.assertEquals(2,
+        protobuf.getDefaultReplicationConfig().getEcReplicationConfig()
+            .getParity());
+
+    // Reconstruct object from Proto
+    recovered = OmBucketInfo.getFromProtobuf(protobuf);
+    Assert.assertEquals(ReplicationType.EC,
+        recovered.getDefaultReplicationConfig().getType());
+    ECReplicationConfig config =
+        recovered.getDefaultReplicationConfig().getEcReplicationConfig();
+    Assert.assertNotNull(config);
+    Assert.assertEquals(3, config.getData());
+    Assert.assertEquals(2, config.getParity());
+  }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
index c78b746..1ee7bce 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
@@ -58,11 +58,17 @@ public class TestOzoneConfigurationFields extends 
TestConfigurationFieldsBase {
     xmlPropsToSkipCompare.add("hadoop.tags.custom");
     xmlPropsToSkipCompare.add("ozone.om.nodes.EXAMPLEOMSERVICEID");
     xmlPropsToSkipCompare.add("ozone.scm.nodes.EXAMPLESCMSERVICEID");
+    xmlPropsToSkipCompare.add("ozone.scm.nodes.EXAMPLESCMSERVICEID");
     xmlPrefixToSkipCompare.add("ipc.client.rpc-timeout.ms");
     xmlPropsToSkipCompare.add("ozone.om.leader.election.minimum.timeout" +
         ".duration"); // Deprecated config
     configurationPropsToSkipCompare
         .add(ScmConfig.ConfigStrings.HDDS_SCM_INIT_DEFAULT_LAYOUT_VERSION);
+    // Currently replication and type configs moved to server side.
+    configurationPropsToSkipCompare
+        .add(OzoneConfigKeys.OZONE_REPLICATION);
+    configurationPropsToSkipCompare
+        .add(OzoneConfigKeys.OZONE_REPLICATION_TYPE);
     addPropertiesNotInXml();
   }
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
index 06f0646..28f7c2c 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
@@ -18,7 +18,10 @@ package org.apache.hadoop.ozone.client.rpc;
 
 import com.google.common.cache.Cache;
 import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
@@ -27,11 +30,15 @@ import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.BucketArgs;
 import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry;
 import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.container.TestHelper;
 import org.junit.AfterClass;
@@ -40,12 +47,14 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
 
@@ -64,6 +73,9 @@ public class TestECKeyOutputStream {
   private static String volumeName;
   private static String bucketName;
   private static String keyString;
+  private static int dataBlocks = 3;
+  private static int parityBlocks = 2;
+  private static byte[][] inputChunks = new byte[dataBlocks][chunkSize];
 
   /**
    * Create a MiniDFSCluster for testing.
@@ -99,6 +111,7 @@ public class TestECKeyOutputStream {
     bucketName = volumeName;
     objectStore.createVolume(volumeName);
     objectStore.getVolume(volumeName).createBucket(bucketName);
+    initInputChunks();
   }
 
   /**
@@ -121,6 +134,61 @@ public class TestECKeyOutputStream {
   }
 
   @Test
+  public void testCreateKeyWithOutBucketDefaults() throws Exception {
+    OzoneVolume volume = objectStore.getVolume(volumeName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+    try (OzoneOutputStream out = bucket.createKey("myKey", 2000)) {
+      Assert.assertTrue(out.getOutputStream() instanceof KeyOutputStream);
+      for (int i = 0; i < inputChunks.length; i++) {
+        out.write(inputChunks[i]);
+      }
+    }
+  }
+
+  @Test
+  public void testCreateKeyWithBucketDefaults() throws Exception {
+    String myBucket = UUID.randomUUID().toString();
+    OzoneVolume volume = objectStore.getVolume(volumeName);
+    final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
+    bucketArgs.setDefaultReplicationConfig(
+        new DefaultReplicationConfig(ReplicationType.EC,
+            new ECReplicationConfig(3, 2)));
+
+    volume.createBucket(myBucket, bucketArgs.build());
+    OzoneBucket bucket = volume.getBucket(myBucket);
+
+    try (OzoneOutputStream out = bucket.createKey(keyString, 2000)) {
+      Assert.assertTrue(out.getOutputStream() instanceof ECKeyOutputStream);
+      for (int i = 0; i < inputChunks.length; i++) {
+        out.write(inputChunks[i]);
+      }
+    }
+  }
+
+  @Test
+  public void testCreateRatisKeyAndWithECBucketDefaults() throws Exception {
+    String myBucket = UUID.randomUUID().toString();
+    OzoneVolume volume = objectStore.getVolume(volumeName);
+    final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
+    bucketArgs.setDefaultReplicationConfig(
+        new DefaultReplicationConfig(ReplicationType.EC,
+            new ECReplicationConfig(3, 2)));
+
+    volume.createBucket(myBucket, bucketArgs.build());
+    OzoneBucket bucket = volume.getBucket(myBucket);
+    try (OzoneOutputStream out = bucket
+        .createKey("testCreateRatisKeyAndWithECBucketDefaults", 2000,
+            new RatisReplicationConfig("3"), new HashMap<>())) {
+      Assert.assertTrue(out.getOutputStream() instanceof KeyOutputStream);
+      for (int i = 0; i < inputChunks.length; i++) {
+        out.write(inputChunks[i]);
+      }
+    }
+  }
+
+
+
+  @Test
   public void testECKeyXceiverClientShouldNotUseCachedKeysForDifferentStreams()
       throws Exception {
     int data = 3;
@@ -131,14 +199,16 @@ public class TestECKeyOutputStream {
       final List<BlockOutputStreamEntry> streamEntries =
           ((ECKeyOutputStream) key.getOutputStream()).getStreamEntries();
       Assert.assertEquals(data + parity, streamEntries.size());
+      final Cache<String, XceiverClientSpi> clientCache =
+          ((XceiverClientManager) ((ECKeyOutputStream) key.getOutputStream())
+              .getXceiverClientFactory()).getClientCache();
+      clientCache.invalidateAll();
+      clientCache.cleanUp();
       final Pipeline firstStreamPipeline = streamEntries.get(0).getPipeline();
       XceiverClientSpi xceiverClientSpi =
           ((ECKeyOutputStream) key.getOutputStream()).getXceiverClientFactory()
               .acquireClient(firstStreamPipeline);
       Assert.assertNotNull(xceiverClientSpi);
-      final Cache<String, XceiverClientSpi> clientCache =
-          ((XceiverClientManager) ((ECKeyOutputStream) key.getOutputStream())
-              .getXceiverClientFactory()).getClientCache();
       final String firstCacheKey =
           clientCache.asMap().entrySet().iterator().next().getKey();
       List<String> prevVisitedKeys = new ArrayList<>();
@@ -172,4 +242,18 @@ public class TestECKeyOutputStream {
     }
     return null;
   }
+
+  private static void initInputChunks() {
+    for (int i = 0; i < dataBlocks; i++) {
+      inputChunks[i] = getBytesWith(i + 1, chunkSize);
+    }
+  }
+
+  private static byte[] getBytesWith(int singleDigitNumber, int total) {
+    StringBuilder builder = new StringBuilder(singleDigitNumber);
+    for (int i = 1; i <= total; i++) {
+      builder.append(singleDigitNumber);
+    }
+    return builder.toString().getBytes(UTF_8);
+  }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java
index ac43c24..e8c0953 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmAcls.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.ozone.client.VolumeArgs;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.apache.hadoop.ozone.security.acl.IOzoneObj;
+import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
 import org.apache.hadoop.ozone.security.acl.RequestContext;
 import org.apache.ozone.test.GenericTestUtils;
 
@@ -144,7 +145,6 @@ public class TestOmAcls {
     logCapturer.clearOutput();
 
     TestOmAcls.aclAllow = false;
-
     OzoneTestUtils.expectOmException(ResultCodes.PERMISSION_DENIED,
         () -> TestDataUtil.createKey(bucket, "testKey", "testcontent"));
     assertTrue(logCapturer.getOutput().contains("doesn't have CREATE " +
@@ -158,6 +158,13 @@ public class TestOmAcls {
 
     @Override
     public boolean checkAccess(IOzoneObj ozoneObject, RequestContext context) {
+      // Allow bucket read access. While creating key, we access bucket to
+      // inherit the replication config.
+      if (((OzoneObjInfo) ozoneObject).getResourceType().toString()
+          .equals("bucket") && "READ"
+          .equals(context.getAclRights().toString())) {
+        return true;
+      }
       return TestOmAcls.aclAllow;
     }
   }
diff --git 
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto 
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index db19c37..da1a6b0 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -543,6 +543,7 @@ message BucketInfo {
     optional int64 quotaInNamespace = 16 [default = -2];
     optional uint64 usedNamespace = 17;
     optional BucketLayoutProto bucketLayout = 18;
+    optional hadoop.hdds.DefaultReplicationConfig defaultReplicationConfig = 
19;
 }
 
 enum StorageTypeProto {
@@ -620,6 +621,7 @@ message BucketArgs {
     repeated hadoop.hdds.KeyValue metadata = 7;
     optional uint64 quotaInBytes = 8;
     optional uint64 quotaInNamespace = 9;
+    optional hadoop.hdds.DefaultReplicationConfig defaultReplicationConfig = 
10;
 }
 
 message PrefixInfo {
@@ -756,8 +758,8 @@ message KeyArgs {
     required string bucketName = 2;
     required string keyName = 3;
     optional uint64 dataSize = 4;
-    optional hadoop.hdds.ReplicationType type = 5;
-    optional hadoop.hdds.ReplicationFactor factor = 6;
+    optional hadoop.hdds.ReplicationType type = 5 [default = NONE];
+    optional hadoop.hdds.ReplicationFactor factor = 6 [default = ZERO];
     repeated KeyLocation keyLocations = 7;
     optional bool isMultipartKey = 8;
     optional string multipartUploadID = 9;
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneConfigUtil.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneConfigUtil.java
new file mode 100644
index 0000000..f8a2add
--- /dev/null
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneConfigUtil.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+
+/**
+ * Utility class for ozone configurations.
+ */
+public final class OzoneConfigUtil {
+  private OzoneConfigUtil() {
+  }
+
+  public static ReplicationConfig resolveReplicationConfigPreference(
+      HddsProtos.ReplicationType clientType,
+      HddsProtos.ReplicationFactor clientFactor,
+      HddsProtos.ECReplicationConfig clientECReplicationConfig,
+      DefaultReplicationConfig bucketDefaultReplicationConfig,
+      ReplicationConfig omDefaultReplicationConfig) {
+    ReplicationConfig replicationConfig = null;
+    if (clientType != HddsProtos.ReplicationType.NONE) {
+      // Client passed the replication config, so let's use it.
+      replicationConfig = ReplicationConfig
+          .fromProto(clientType, clientFactor, clientECReplicationConfig);
+    } else {
+      // type is NONE, so, let's look for the bucket defaults.
+      if (bucketDefaultReplicationConfig != null) {
+        // Since Bucket defaults are available, let's inherit
+        replicationConfig = ReplicationConfig.fromProto(
+            ReplicationType.toProto(bucketDefaultReplicationConfig.getType()),
+            ReplicationFactor
+                .toProto(bucketDefaultReplicationConfig.getFactor()),
+            bucketDefaultReplicationConfig.getEcReplicationConfig().toProto());
+      } else {
+        // if bucket defaults also not available, then use server defaults.
+        replicationConfig = omDefaultReplicationConfig;
+      }
+    }
+    return replicationConfig;
+  }
+}
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index b144371..4da72c3 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -60,6 +60,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertResponseProto;
@@ -227,6 +229,10 @@ import static 
org.apache.hadoop.ozone.OzoneConsts.PREPARE_MARKER_KEY;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_RATIS_SNAPSHOT_DIR;
 import static org.apache.hadoop.ozone.OzoneConsts.RPC_PORT;
 import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_KEY;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_TYPE_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_TYPE_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS_DEFAULT;
@@ -2441,7 +2447,13 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     auditMap.put(OzoneConsts.BUCKET, bucket);
     try {
       metrics.incNumBucketInfos();
-      return bucketManager.getBucketInfo(volume, bucket);
+      final OmBucketInfo bucketInfo =
+          bucketManager.getBucketInfo(volume, bucket);
+      if (bucketInfo != null && bucketInfo
+          .getDefaultReplicationConfig() == null) {
+        bucketInfo.setDefaultReplicationConfig(getDefaultReplicationConfig());
+      }
+      return bucketInfo;
     } catch (Exception ex) {
       metrics.incNumBucketInfoFails();
       auditSuccess = false;
@@ -4060,6 +4072,15 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
         .getTrimmed(OZONE_OM_METADATA_LAYOUT, 
OZONE_OM_METADATA_LAYOUT_DEFAULT);
   }
 
+  public ReplicationConfig getDefaultReplicationConfig() {
+    String replication = 
configuration.get(OZONE_SERVER_DEFAULT_REPLICATION_KEY,
+        OZONE_SERVER_DEFAULT_REPLICATION_DEFAULT);
+    String type = configuration.get(OZONE_SERVER_DEFAULT_REPLICATION_TYPE_KEY,
+        OZONE_SERVER_DEFAULT_REPLICATION_TYPE_DEFAULT);
+    return ReplicationConfig
+        .fromTypeAndString(ReplicationType.valueOf(type), replication);
+  }
+
   /**
    * Create volume which is required for S3Gateway operations.
    * @throws IOException
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
index a8e8547..2047ef6 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequest.java
@@ -28,7 +28,8 @@ import java.util.Map;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
@@ -356,8 +357,8 @@ public class OMDirectoryCreateRequest extends OMKeyRequest {
         .setCreationTime(keyArgs.getModificationTime())
         .setModificationTime(keyArgs.getModificationTime())
         .setDataSize(0)
-        .setReplicationConfig(ReplicationConfig
-                .fromTypeAndFactor(keyArgs.getType(), keyArgs.getFactor()))
+        .setReplicationConfig(new StandaloneReplicationConfig(
+            HddsProtos.ReplicationFactor.ONE))
         .setObjectID(objectId)
         .setUpdateID(objectId);
   }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
index 73f2f07..c51324b 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequest.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OzoneConfigUtil;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.response.file.OMFileCreateResponse;
 import org.slf4j.Logger;
@@ -125,8 +126,14 @@ public class OMFileCreateRequest extends OMKeyRequest {
       type = useRatis ? HddsProtos.ReplicationType.RATIS :
           HddsProtos.ReplicationType.STAND_ALONE;
     }
-    ReplicationConfig repConfig = ReplicationConfig.fromProto(
-        type, factor, keyArgs.getEcReplicationConfig());
+
+    final OmBucketInfo bucketInfo = ozoneManager
+        .getBucketInfo(keyArgs.getVolumeName(), keyArgs.getBucketName());
+    final ReplicationConfig repConfig = OzoneConfigUtil
+        .resolveReplicationConfigPreference(type, factor,
+            keyArgs.getEcReplicationConfig(),
+            bucketInfo.getDefaultReplicationConfig(),
+            ozoneManager.getDefaultReplicationConfig());
 
     // TODO: Here we are allocating block with out any check for
     //  bucket/key/volume or not and also with out any authorization checks.
@@ -244,14 +251,19 @@ public class OMFileCreateRequest extends OMKeyRequest {
       }
 
       // do open key
-      OmBucketInfo bucketInfo = omMetadataManager.getBucketTable().get(
-          omMetadataManager.getBucketKey(volumeName, bucketName));
+      omBucketInfo =
+          getBucketInfo(omMetadataManager, volumeName, bucketName);
+      final ReplicationConfig repConfig = OzoneConfigUtil
+          .resolveReplicationConfigPreference(keyArgs.getType(),
+              keyArgs.getFactor(), keyArgs.getEcReplicationConfig(),
+              omBucketInfo.getDefaultReplicationConfig(),
+              ozoneManager.getDefaultReplicationConfig());
 
       omKeyInfo = prepareKeyInfo(omMetadataManager, keyArgs, dbKeyInfo,
           keyArgs.getDataSize(), locations, getFileEncryptionInfo(keyArgs),
-          ozoneManager.getPrefixManager(), bucketInfo, trxnLogIndex,
+          ozoneManager.getPrefixManager(), omBucketInfo, trxnLogIndex,
           ozoneManager.getObjectIdFromTxId(trxnLogIndex),
-          ozoneManager.isRatisEnabled());
+          ozoneManager.isRatisEnabled(), repConfig);
 
       long openVersion = omKeyInfo.getLatestVersionLocations().getVersion();
       long clientID = createFileRequest.getClientID();
@@ -267,8 +279,6 @@ public class OMFileCreateRequest extends OMKeyRequest {
           .stream().map(OmKeyLocationInfo::getFromProtobuf)
           .collect(Collectors.toList());
       omKeyInfo.appendNewBlocks(newLocationList, false);
-
-      omBucketInfo = getBucketInfo(omMetadataManager, volumeName, bucketName);
       // check bucket and volume quota
       long preAllocatedSpace = newLocationList.size()
           * ozoneManager.getScmBlockSize()
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestWithFSO.java
index d792222..1fdb723 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMFileCreateRequestWithFSO.java
@@ -19,9 +19,11 @@
 package org.apache.hadoop.ozone.om.request.file;
 
 import com.google.common.base.Optional;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.ozone.audit.OMAction;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OzoneConfigUtil;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -162,13 +164,18 @@ public class OMFileCreateRequestWithFSO extends 
OMFileCreateRequest {
       // do open key
       OmBucketInfo bucketInfo = omMetadataManager.getBucketTable().get(
           omMetadataManager.getBucketKey(volumeName, bucketName));
+      final ReplicationConfig repConfig = OzoneConfigUtil
+          .resolveReplicationConfigPreference(keyArgs.getType(),
+              keyArgs.getFactor(), keyArgs.getEcReplicationConfig(),
+              bucketInfo.getDefaultReplicationConfig(),
+              ozoneManager.getDefaultReplicationConfig());
 
       OmKeyInfo omFileInfo = prepareFileInfo(omMetadataManager, keyArgs,
               dbFileInfo, keyArgs.getDataSize(), locations,
               getFileEncryptionInfo(keyArgs), ozoneManager.getPrefixManager(),
               bucketInfo, pathInfoFSO, trxnLogIndex,
               pathInfoFSO.getLeafNodeObjectId(),
-              ozoneManager.isRatisEnabled());
+              ozoneManager.isRatisEnabled(), repConfig);
 
       long openVersion = omFileInfo.getLatestVersionLocations().getVersion();
       long clientID = createFileRequest.getClientID();
@@ -184,9 +191,9 @@ public class OMFileCreateRequestWithFSO extends 
OMFileCreateRequest {
 
       omBucketInfo = getBucketInfo(omMetadataManager, volumeName, bucketName);
       // check bucket and volume quota
-      long preAllocatedSpace = newLocationList.size()
-              * ozoneManager.getScmBlockSize()
-              * omFileInfo.getReplicationConfig().getRequiredNodes();
+      long preAllocatedSpace =
+          newLocationList.size() * ozoneManager.getScmBlockSize() * repConfig
+              .getRequiredNodes();
       checkBucketQuotaInBytes(omBucketInfo, preAllocatedSpace);
       checkBucketQuotaInNamespace(omBucketInfo, 1L);
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
index 9ac43c9..f6a954f 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMAllocateBlockRequest.java
@@ -222,8 +222,9 @@ public class OMAllocateBlockRequest extends OMKeyRequest {
       omBucketInfo.incrUsedBytes(preAllocatedSpace);
       omResponse.setAllocateBlockResponse(AllocateBlockResponse.newBuilder()
           .setKeyLocation(blockLocation).build());
+      OmBucketInfo shortBucketInfo = omBucketInfo.copyObject();
       omClientResponse = new OMAllocateBlockResponse(omResponse.build(),
-          openKeyInfo, clientID, omBucketInfo.copyObject());
+          openKeyInfo, clientID, shortBucketInfo);
 
       LOG.debug("Allocated block for Volume:{}, Bucket:{}, OpenKey:{}",
           volumeName, bucketName, openKeyName);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
index 5fdfe6c..564c3d7 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OzoneConfigUtil;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.request.file.OMDirectoryCreateRequest;
 import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
@@ -142,8 +143,13 @@ public class OMKeyCreateRequest extends OMKeyRequest {
         type = useRatis ? HddsProtos.ReplicationType.RATIS :
             HddsProtos.ReplicationType.STAND_ALONE;
       }
-      ReplicationConfig repConfig = ReplicationConfig.fromProto(
-          type, factor, keyArgs.getEcReplicationConfig());
+      final OmBucketInfo bucketInfo = ozoneManager
+          .getBucketInfo(keyArgs.getVolumeName(), keyArgs.getBucketName());
+      final ReplicationConfig repConfig = OzoneConfigUtil
+          .resolveReplicationConfigPreference(type, factor,
+              keyArgs.getEcReplicationConfig(),
+              bucketInfo.getDefaultReplicationConfig(),
+              ozoneManager.getDefaultReplicationConfig());
 
       // TODO: Here we are allocating block with out any check for
       //  bucket/key/volume or not and also with out any authorization checks.
@@ -278,11 +284,17 @@ public class OMKeyCreateRequest extends OMKeyRequest {
         numMissingParents = missingParentInfos.size();
       }
 
+      ReplicationConfig replicationConfig = OzoneConfigUtil
+          .resolveReplicationConfigPreference(keyArgs.getType(),
+              keyArgs.getFactor(), keyArgs.getEcReplicationConfig(),
+              bucketInfo.getDefaultReplicationConfig(),
+              ozoneManager.getDefaultReplicationConfig());
+
       omKeyInfo = prepareKeyInfo(omMetadataManager, keyArgs, dbKeyInfo,
           keyArgs.getDataSize(), locations, getFileEncryptionInfo(keyArgs),
           ozoneManager.getPrefixManager(), bucketInfo, trxnLogIndex,
           ozoneManager.getObjectIdFromTxId(trxnLogIndex),
-          ozoneManager.isRatisEnabled());
+          ozoneManager.isRatisEnabled(), replicationConfig);
 
       long openVersion = omKeyInfo.getLatestVersionLocations().getVersion();
       long clientID = createKeyRequest.getClientID();
@@ -305,7 +317,7 @@ public class OMKeyCreateRequest extends OMKeyRequest {
       // commitKey.
       long preAllocatedSpace = newLocationList.size()
           * ozoneManager.getScmBlockSize()
-          * omKeyInfo.getReplicationConfig().getRequiredNodes();
+          * replicationConfig.getRequiredNodes();
       // check bucket and volume quota
       checkBucketQuotaInBytes(omBucketInfo, preAllocatedSpace);
       checkBucketQuotaInNamespace(omBucketInfo, 1L);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java
index 654c251..6125f9d 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequestWithFSO.java
@@ -19,9 +19,11 @@
 package org.apache.hadoop.ozone.om.request.key;
 
 import com.google.common.base.Optional;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.ozone.audit.OMAction;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OzoneConfigUtil;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -145,13 +147,18 @@ public class OMKeyCreateRequestWithFSO extends 
OMKeyCreateRequest {
       // do open key
       OmBucketInfo bucketInfo = omMetadataManager.getBucketTable().get(
               omMetadataManager.getBucketKey(volumeName, bucketName));
+      final ReplicationConfig repConfig = OzoneConfigUtil
+          .resolveReplicationConfigPreference(keyArgs.getType(),
+              keyArgs.getFactor(), keyArgs.getEcReplicationConfig(),
+              bucketInfo.getDefaultReplicationConfig(),
+              ozoneManager.getDefaultReplicationConfig());
 
       OmKeyInfo omFileInfo = prepareFileInfo(omMetadataManager, keyArgs,
               dbFileInfo, keyArgs.getDataSize(), locations,
               getFileEncryptionInfo(keyArgs), ozoneManager.getPrefixManager(),
               bucketInfo, pathInfoFSO, trxnLogIndex,
               pathInfoFSO.getLeafNodeObjectId(),
-              ozoneManager.isRatisEnabled());
+              ozoneManager.isRatisEnabled(), repConfig);
 
       long openVersion = omFileInfo.getLatestVersionLocations().getVersion();
       long clientID = createKeyRequest.getClientID();
@@ -167,9 +174,9 @@ public class OMKeyCreateRequestWithFSO extends 
OMKeyCreateRequest {
 
       omBucketInfo = getBucketInfo(omMetadataManager, volumeName, bucketName);
       // check bucket and volume quota
-      long preAllocatedSpace = newLocationList.size()
-              * ozoneManager.getScmBlockSize()
-              * omFileInfo.getReplicationConfig().getRequiredNodes();
+      long preAllocatedSpace =
+          newLocationList.size() * ozoneManager.getScmBlockSize() * repConfig
+              .getRequiredNodes();
       checkBucketQuotaInBytes(omBucketInfo, preAllocatedSpace);
       checkBucketQuotaInNamespace(omBucketInfo, 1L);
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index bfd7bb7..3410641 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -585,12 +585,13 @@ public abstract class OMKeyRequest extends 
OMClientRequest {
           @Nullable FileEncryptionInfo encInfo,
           @Nonnull PrefixManager prefixManager,
           @Nullable OmBucketInfo omBucketInfo,
-          long transactionLogIndex, long objectID, boolean isRatisEnabled)
+          long transactionLogIndex, long objectID, boolean isRatisEnabled,
+          ReplicationConfig replicationConfig)
           throws IOException {
 
     return prepareFileInfo(omMetadataManager, keyArgs, dbKeyInfo, size,
             locations, encInfo, prefixManager, omBucketInfo, null,
-            transactionLogIndex, objectID, isRatisEnabled);
+            transactionLogIndex, objectID, isRatisEnabled, replicationConfig);
   }
 
   /**
@@ -608,7 +609,7 @@ public abstract class OMKeyRequest extends OMClientRequest {
           @Nullable OmBucketInfo omBucketInfo,
           OMFileRequest.OMPathInfoWithFSO omPathInfo,
           long transactionLogIndex, long objectID,
-          boolean isRatisEnabled)
+          boolean isRatisEnabled, ReplicationConfig replicationConfig)
           throws IOException {
     if (keyArgs.getIsMultipartKey()) {
       return prepareMultipartFileInfo(omMetadataManager, keyArgs,
@@ -633,11 +634,9 @@ public abstract class OMKeyRequest extends OMClientRequest 
{
 
     // the key does not exist, create a new object.
     // Blocks will be appended as version 0.
-    return createFileInfo(keyArgs, locations, ReplicationConfig
-            .fromProto(keyArgs.getType(), keyArgs.getFactor(),
-                keyArgs.getEcReplicationConfig()), keyArgs.getDataSize(),
-        encInfo, prefixManager, omBucketInfo, omPathInfo, transactionLogIndex,
-        objectID);
+    return createFileInfo(keyArgs, locations, replicationConfig,
+            keyArgs.getDataSize(), encInfo, prefixManager,
+            omBucketInfo, omPathInfo, transactionLogIndex, objectID);
   }
 
   /**
@@ -654,7 +653,6 @@ public abstract class OMKeyRequest extends OMClientRequest {
       @Nullable OmBucketInfo omBucketInfo,
       OMFileRequest.OMPathInfoWithFSO omPathInfo,
       long transactionLogIndex, long objectID) {
-
     OmKeyInfo.Builder builder = new OmKeyInfo.Builder();
     builder.setVolumeName(keyArgs.getVolumeName())
             .setBucketName(keyArgs.getBucketName())
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
index 5746937..39f4057 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequest.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.ozone.audit.OMAction;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneConfigUtil;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -164,9 +165,14 @@ public class S3InitiateMultipartUploadRequest extends 
OMKeyRequest {
       // also like this, even when key exists in a bucket, user can still
       // initiate MPU.
 
-      final ReplicationConfig replicationConfig =
-          ReplicationConfig.fromTypeAndFactor(
-              keyArgs.getType(), keyArgs.getFactor());
+      final OmBucketInfo bucketInfo = omMetadataManager.getBucketTable()
+          .get(omMetadataManager.getBucketKey(volumeName, bucketName));
+      final ReplicationConfig replicationConfig = OzoneConfigUtil
+          .resolveReplicationConfigPreference(keyArgs.getType(),
+              keyArgs.getFactor(), keyArgs.getEcReplicationConfig(),
+              bucketInfo != null ?
+                  bucketInfo.getDefaultReplicationConfig() :
+                  null, ozoneManager.getDefaultReplicationConfig());
 
       multipartKeyInfo = new OmMultipartKeyInfo.Builder()
           .setUploadID(keyArgs.getMultipartUploadID())
@@ -177,9 +183,6 @@ public class S3InitiateMultipartUploadRequest extends 
OMKeyRequest {
           .setUpdateID(transactionLogIndex)
           .build();
 
-      OmBucketInfo bucketInfo = omMetadataManager.getBucketTable().get(
-          omMetadataManager.getBucketKey(volumeName, bucketName));
-
       omKeyInfo = new OmKeyInfo.Builder()
           .setVolumeName(volumeName)
           .setBucketName(bucketName)
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestWithFSO.java
index da13536..90c44ed 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3InitiateMultipartUploadRequestWithFSO.java
@@ -24,8 +24,10 @@ import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneConfigUtil;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
@@ -160,9 +162,14 @@ public class S3InitiateMultipartUploadRequestWithFSO
       // care of in the final complete multipart upload. AWS S3 behavior is
       // also like this, even when key exists in a bucket, user can still
       // initiate MPU.
-      final ReplicationConfig replicationConfig =
-              ReplicationConfig.fromTypeAndFactor(
-                      keyArgs.getType(), keyArgs.getFactor());
+      final OmBucketInfo bucketInfo = omMetadataManager.getBucketTable()
+          .get(omMetadataManager.getBucketKey(volumeName, bucketName));
+      final ReplicationConfig replicationConfig = OzoneConfigUtil
+          .resolveReplicationConfigPreference(keyArgs.getType(),
+              keyArgs.getFactor(), keyArgs.getEcReplicationConfig(),
+              bucketInfo != null ?
+                  bucketInfo.getDefaultReplicationConfig() :
+                  null, ozoneManager.getDefaultReplicationConfig());
 
       multipartKeyInfo = new OmMultipartKeyInfo.Builder()
           .setUploadID(keyArgs.getMultipartUploadID())
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
index 93b7787..94107c4 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.ozone.om.OzoneManagerPrepareState;
 import org.apache.hadoop.ozone.om.ResolvedBucket;
 import org.apache.hadoop.ozone.om.KeyManager;
 import org.apache.hadoop.ozone.om.KeyManagerImpl;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
@@ -134,6 +135,8 @@ public class TestOMKeyRequest {
     when(ozoneManager.isAdmin(any(String.class))).thenReturn(true);
     when(ozoneManager.isAdmin(any(UserGroupInformation.class)))
         .thenReturn(true);
+    when(ozoneManager.getBucketInfo(anyString(), anyString())).thenReturn(
+        new 
OmBucketInfo.Builder().setVolumeName("").setBucketName("").build());
     Mockito.doNothing().when(auditLogger).logWrite(any(AuditMessage.class));
 
     scmClient = Mockito.mock(ScmClient.class);
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
index 5fe2c0d..aa1230f 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
@@ -22,6 +22,8 @@ package org.apache.hadoop.ozone.om.request.s3.multipart;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.junit.After;
 import org.junit.Assert;
@@ -47,6 +49,8 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Part;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
 
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_TYPE_DEFAULT;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.when;
 
@@ -82,6 +86,10 @@ public class TestS3MultipartRequest {
     when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
     auditLogger = Mockito.mock(AuditLogger.class);
     when(ozoneManager.getAuditLogger()).thenReturn(auditLogger);
+    when(ozoneManager.getDefaultReplicationConfig()).thenReturn(
+        ReplicationConfig.fromTypeAndString(ReplicationType
+                .valueOf(OZONE_SERVER_DEFAULT_REPLICATION_TYPE_DEFAULT),
+            OZONE_SERVER_DEFAULT_REPLICATION_DEFAULT));
     Mockito.doNothing().when(auditLogger).logWrite(any(AuditMessage.class));
     when(ozoneManager.resolveBucketLink(any(KeyArgs.class),
         any(OMClientRequest.class)))
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java
index 8d98882..65f2d5a 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java
@@ -21,6 +21,8 @@ import com.google.common.base.Optional;
 import com.google.common.collect.Iterators;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.StorageType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -69,6 +71,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_TYPE_DEFAULT;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
@@ -134,6 +138,10 @@ public class TestCleanupTableInfo {
         );
     when(om.getAclsEnabled()).thenReturn(false);
     when(om.getAuditLogger()).thenReturn(mock(AuditLogger.class));
+    when(om.getDefaultReplicationConfig()).thenReturn(ReplicationConfig
+        .fromTypeAndString(ReplicationType
+                .valueOf(OZONE_SERVER_DEFAULT_REPLICATION_TYPE_DEFAULT),
+            OZONE_SERVER_DEFAULT_REPLICATION_DEFAULT));
     addVolumeToMetaTable(aVolumeArgs());
     addBucketToMetaTable(aBucketInfo());
   }
diff --git 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
index 6bcb689..e6e664e 100644
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
@@ -160,6 +160,12 @@ public class BasicOzoneClientAdapterImpl implements 
OzoneClientAdapter {
 
   @Override
   public short getDefaultReplication() {
+    if (replicationConfig == null) {
+      // to provide backward compatibility, we are just retuning 3;
+      // However we need to handle with the correct behavior.
+      // TODO: Please see HDDS-5646
+      return (short) ReplicationFactor.THREE.getValue();
+    }
     return (short) replicationConfig.getRequiredNodes();
   }
 
@@ -194,19 +200,24 @@ public class BasicOzoneClientAdapterImpl implements 
OzoneClientAdapter {
     incrementCounter(Statistic.OBJECTS_CREATED, 1);
     try {
       OzoneOutputStream ozoneOutputStream = null;
+      ReplicationConfig replConfig = this.replicationConfig;
+      // Since the bucket has the right default replication, we are using it.
+      if (bucket.getReplicationConfig() != null) {
+        replConfig = bucket.getReplicationConfig();
+      }
       if (replication == ReplicationFactor.ONE.getValue()
           || replication == ReplicationFactor.THREE.getValue()) {
 
         ReplicationConfig customReplicationConfig =
             ReplicationConfig.adjustReplication(
-                replicationConfig, replication
+                replConfig, replication
             );
         ozoneOutputStream =
             bucket.createFile(key, 0, customReplicationConfig, overWrite,
                 recursive);
       } else {
         ozoneOutputStream =
-            bucket.createFile(key, 0, replicationConfig, overWrite, recursive);
+            bucket.createFile(key, 0, replConfig, overWrite, recursive);
       }
       return new OzoneFSOutputStream(ozoneOutputStream.getOutputStream());
     } catch (OMException ex) {
diff --git 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
index 1f8f25e..ee17992 100644
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
@@ -266,6 +266,12 @@ public class BasicRootedOzoneClientAdapterImpl
 
   @Override
   public short getDefaultReplication() {
+    if (replicationConfig == null) {
+      // to provide backward compatibility, we are just retuning 3;
+      // However we need to handle with the correct behavior.
+      // TODO: Please see HDDS-5646
+      return (short) ReplicationFactor.THREE.getValue();
+    }
     return (short) replicationConfig.getRequiredNodes();
   }
 
@@ -309,16 +315,21 @@ public class BasicRootedOzoneClientAdapterImpl
     try {
       // Hadoop CopyCommands class always sets recursive to true
       OzoneBucket bucket = getBucket(ofsPath, recursive);
+      ReplicationConfig replConfig = this.replicationConfig;
+      // Since the bucket has the right default replication, we are using it.
+      if (bucket.getReplicationConfig() != null) {
+        replConfig = bucket.getReplicationConfig();
+      }
       OzoneOutputStream ozoneOutputStream = null;
       if (replication == ReplicationFactor.ONE.getValue()
           || replication == ReplicationFactor.THREE.getValue()) {
 
         ozoneOutputStream = bucket.createFile(key, 0,
-            ReplicationConfig.adjustReplication(replicationConfig, 
replication),
+            ReplicationConfig.adjustReplication(replConfig, replication),
             overWrite, recursive);
       } else {
         ozoneOutputStream =
-            bucket.createFile(key, 0, replicationConfig, overWrite, recursive);
+            bucket.createFile(key, 0, replConfig, overWrite, recursive);
       }
       return new OzoneFSOutputStream(ozoneOutputStream.getOutputStream());
     } catch (OMException ex) {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to