This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new 78ccdd54 HDDS-6131: EC: Replication config from bucket should be 
refreshed in o3fs (#3008)
78ccdd54 is described below

commit 78ccdd540cb3381630f9669529ff5d7a82759ed1
Author: Uma Maheswara Rao G <[email protected]>
AuthorDate: Thu Jan 27 09:45:09 2022 -0800

    HDDS-6131: EC: Replication config from bucket should be refreshed in o3fs 
(#3008)
---
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |   7 ++
 .../common/src/main/resources/ozone-default.xml    |  12 ++
 .../hadoop/fs/ozone/TestOzoneFileSystem.java       |  70 ++++++++++-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |   4 -
 .../fs/ozone/BasicOzoneClientAdapterImpl.java      |  72 +++++++----
 .../ozone/BasicRootedOzoneClientAdapterImpl.java   |  43 +++----
 .../apache/hadoop/fs/ozone/OzoneClientUtils.java   |  83 +++++++++++++
 .../hadoop/fs/ozone/TestOzoneClientUtils.java      | 131 +++++++++++++++++++++
 8 files changed, 365 insertions(+), 57 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index f719571..51c0446 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -472,6 +472,13 @@ public final class OzoneConfigKeys {
   // The protocol starts at 2.0.0 and a null or empty value for older versions.
   public static final String OZONE_OM_CLIENT_PROTOCOL_VERSION = "2.0.0";
 
+  public static final String
+      OZONE_CLIENT_BUCKET_REPLICATION_CONFIG_REFRESH_PERIOD_MS =
+      "ozone.client.bucket.replication.config.refresh.time.ms";
+  public static final long
+      OZONE_CLIENT_BUCKET_REPLICATION_CONFIG_REFRESH_PERIOD_DEFAULT_MS =
+      300 * 1000;
+
   /**
    * There is no need to instantiate this class.
    */
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 3e2b5e9..ce4ad1e 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1180,6 +1180,18 @@
   </property>
 
   <property>
+    <name>ozone.client.bucket.replication.config.refresh.time.ms</name>
+    <value>30000</value>
+    <tag>OZONE</tag>
+    <description>
+      Default time period to refresh the bucket replication config in o3fs
+      clients. Until the bucket replication config refreshed, client will
+      continue to use existing replication config irrespective of whether 
bucket
+      replication config updated at OM or not.
+    </description>
+  </property>
+
+  <property>
     <name>hdds.container.close.threshold</name>
     <value>0.9f</value>
     <tag>OZONE, DATANODE</tag>
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
index 6491f88..2a5a69b 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.fs.ozone;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -29,6 +31,7 @@ import java.util.Iterator;
 import java.util.Set;
 import java.util.TreeSet;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -42,8 +45,12 @@ import org.apache.hadoop.fs.Trash;
 import org.apache.hadoop.fs.TrashPolicy;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.TestDataUtil;
@@ -81,6 +88,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import org.apache.ozone.test.LambdaTestUtils;
+import org.apache.ozone.test.TestClock;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -164,7 +172,7 @@ public class TestOzoneFileSystem {
     conf.set(OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT,
         bucketLayout.name());
     cluster = MiniOzoneCluster.newBuilder(conf)
-            .setNumDatanodes(3)
+            .setNumDatanodes(5)
             .build();
     cluster.waitForClusterToBeReady();
 
@@ -1186,6 +1194,66 @@ public class TestOzoneFileSystem {
   }
 
   @Test
+  public void testCreateKeyShouldUseRefreshedBucketReplicationConfig()
+      throws IOException {
+    OzoneBucket bucket =
+        TestDataUtil.createVolumeAndBucket(cluster, bucketLayout);
+    final TestClock testClock = new TestClock(Instant.now(), ZoneOffset.UTC);
+
+    String rootPath = String
+        .format("%s://%s.%s/", OzoneConsts.OZONE_URI_SCHEME, bucket.getName(),
+            bucket.getVolumeName());
+
+    // Set the fs.defaultFS and start the filesystem
+    Configuration conf = new OzoneConfiguration(cluster.getConf());
+    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+    // Set the number of keys to be processed during batch operate.
+    OzoneFileSystem o3FS = (OzoneFileSystem) FileSystem.get(conf);
+
+    //Let's reset the clock to control the time.
+    ((BasicOzoneClientAdapterImpl) (o3FS.getAdapter())).setClock(testClock);
+
+    createKeyAndAssertKeyType(bucket, o3FS, new Path(rootPath, "key"),
+        ReplicationType.RATIS);
+
+    bucket.setReplicationConfig(new ECReplicationConfig("rs-3-2-1024k"));
+
+    //After changing the bucket policy, it should create ec key, but o3fs will
+    // refresh after some time. So, it will be sill old type.
+    createKeyAndAssertKeyType(bucket, o3FS, new Path(rootPath, "key1"),
+        ReplicationType.RATIS);
+
+    testClock.fastForward(300 * 1000 + 1);
+
+    //After client bucket refresh time, it should create new type what is
+    // available on bucket at that moment.
+    createKeyAndAssertKeyType(bucket, o3FS, new Path(rootPath, "key2"),
+        ReplicationType.EC);
+
+    // Rechecking the same steps with changing to Ratis again to check the
+    // behavior is consistent.
+    bucket.setReplicationConfig(
+        new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE));
+
+    createKeyAndAssertKeyType(bucket, o3FS, new Path(rootPath, "key3"),
+        ReplicationType.EC);
+
+    testClock.fastForward(300 * 1000 + 1);
+
+    createKeyAndAssertKeyType(bucket, o3FS, new Path(rootPath, "key4"),
+        ReplicationType.RATIS);
+  }
+
+  private void createKeyAndAssertKeyType(OzoneBucket bucket,
+      OzoneFileSystem o3FS, Path keyPath, ReplicationType expectedType)
+      throws IOException {
+    o3FS.createFile(keyPath).build().close();
+    Assert.assertEquals(expectedType.name(),
+        bucket.getKey(o3FS.pathToKey(keyPath)).getReplicationConfig()
+            .getReplicationType().name());
+  }
+
+  @Test
   public void testGetTrashRoots() throws IOException {
     String username = UserGroupInformation.getCurrentUser().getShortUserName();
     Path trashRoot = new Path(OZONE_URI_DELIMITER, TRASH_PREFIX);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index ffed22e..59dd3d9 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -2557,10 +2557,6 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
       metrics.incNumBucketInfos();
       final OmBucketInfo bucketInfo =
           bucketManager.getBucketInfo(volume, bucket);
-      if (bucketInfo != null && bucketInfo
-          .getDefaultReplicationConfig() == null) {
-        bucketInfo.setDefaultReplicationConfig(getDefaultReplicationConfig());
-      }
       return bucketInfo;
     } catch (Exception ex) {
       metrics.incNumBucketInfoFails();
diff --git 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
index 508e22d..2dfe4f0 100644
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
@@ -21,11 +21,14 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
+import java.time.Clock;
+import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.HashSet;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.fs.BlockLocation;
@@ -49,6 +52,7 @@ import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.OzoneKey;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.common.MonotonicClock;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -63,6 +67,7 @@ import org.apache.hadoop.ozone.om.helpers.OzoneFSUtils;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,10 +87,15 @@ public class BasicOzoneClientAdapterImpl implements 
OzoneClientAdapter {
   private ObjectStore objectStore;
   private OzoneVolume volume;
   private OzoneBucket bucket;
-  private ReplicationConfig replicationConfig;
+  private ReplicationConfig bucketReplicationConfig;
+  // Client side configured replication config.
+  private ReplicationConfig clientConfiguredReplicationConfig;
   private boolean securityEnabled;
   private int configuredDnPort;
   private OzoneConfiguration config;
+  private long nextReplicationConfigRefreshTime;
+  private long bucketRepConfigRefreshPeriodMS;
+  private java.time.Clock clock = new MonotonicClock(ZoneOffset.UTC);
 
   /**
    * Create new OzoneClientAdapter implementation.
@@ -110,6 +120,11 @@ public class BasicOzoneClientAdapterImpl implements 
OzoneClientAdapter {
       throws IOException {
 
     OzoneConfiguration conf = OzoneConfiguration.of(hadoopConf);
+    bucketRepConfigRefreshPeriodMS = conf.getLong(
+        OzoneConfigKeys
+            .OZONE_CLIENT_BUCKET_REPLICATION_CONFIG_REFRESH_PERIOD_MS,
+        OzoneConfigKeys
+            .OZONE_CLIENT_BUCKET_REPLICATION_CONFIG_REFRESH_PERIOD_DEFAULT_MS);
     if (omHost == null && OmUtils.isServiceIdsDefined(conf)) {
       // When the host name or service id isn't given
       // but ozone.om.service.ids is defined, declare failure.
@@ -139,7 +154,8 @@ public class BasicOzoneClientAdapterImpl implements 
OzoneClientAdapter {
       this.securityEnabled = true;
     }
 
-    replicationConfig = ReplicationConfig.getDefault(conf);
+    clientConfiguredReplicationConfig =
+        OzoneClientUtils.getClientConfiguredReplicationConfig(conf);
 
     if (OmUtils.isOmHAServiceId(conf, omHost)) {
       // omHost is listed as one of the service ids in the config,
@@ -156,6 +172,9 @@ public class BasicOzoneClientAdapterImpl implements 
OzoneClientAdapter {
     objectStore = ozoneClient.getObjectStore();
     this.volume = objectStore.getVolume(volumeStr);
     this.bucket = volume.getBucket(bucketStr);
+    bucketReplicationConfig = this.bucket.getReplicationConfig();
+    nextReplicationConfigRefreshTime =
+        clock.millis() + bucketRepConfigRefreshPeriodMS;
 
     // resolve the bucket layout in case of Link Bucket
     BucketLayout resolvedBucketLayout =
@@ -170,15 +189,20 @@ public class BasicOzoneClientAdapterImpl implements 
OzoneClientAdapter {
     this.config = conf;
   }
 
+  /**
+   * This API returns the value what is configured at client side only. It 
could
+   * differ from the server side default values. If no replication config
+   * configured at client, it will return 3.
+   */
   @Override
   public short getDefaultReplication() {
-    if (replicationConfig == null) {
+    if (clientConfiguredReplicationConfig == null) {
       // to provide backward compatibility, we are just retuning 3;
       // However we need to handle with the correct behavior.
       // TODO: Please see HDDS-5646
       return (short) ReplicationFactor.THREE.getValue();
     }
-    return (short) replicationConfig.getRequiredNodes();
+    return (short) clientConfiguredReplicationConfig.getRequiredNodes();
   }
 
   @Override
@@ -211,25 +235,11 @@ public class BasicOzoneClientAdapterImpl implements 
OzoneClientAdapter {
       boolean overWrite, boolean recursive) throws IOException {
     incrementCounter(Statistic.OBJECTS_CREATED, 1);
     try {
-      OzoneOutputStream ozoneOutputStream = null;
-      ReplicationConfig replConfig = this.replicationConfig;
-      // Since the bucket has the right default replication, we are using it.
-      if (bucket.getReplicationConfig() != null) {
-        replConfig = bucket.getReplicationConfig();
-      }
-      if (replication == ReplicationFactor.ONE.getValue()
-          || replication == ReplicationFactor.THREE.getValue()) {
-
-        ReplicationConfig customReplicationConfig =
-            ReplicationConfig.adjustReplication(
-                replConfig, replication, config);
-        ozoneOutputStream =
-            bucket.createFile(key, 0, customReplicationConfig, overWrite,
-                recursive);
-      } else {
-        ozoneOutputStream =
-            bucket.createFile(key, 0, replConfig, overWrite, recursive);
-      }
+      OzoneOutputStream ozoneOutputStream = bucket.createFile(key, 0,
+          OzoneClientUtils.resolveClientSideReplicationConfig(replication,
+              this.clientConfiguredReplicationConfig,
+              getReplicationConfigWithRefreshCheck(), config), overWrite,
+          recursive);
       return new OzoneFSOutputStream(ozoneOutputStream.getOutputStream());
     } catch (OMException ex) {
       if (ex.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS
@@ -242,6 +252,17 @@ public class BasicOzoneClientAdapterImpl implements 
OzoneClientAdapter {
     }
   }
 
+  private ReplicationConfig getReplicationConfigWithRefreshCheck()
+      throws IOException {
+    if (clock.millis() > nextReplicationConfigRefreshTime) {
+      this.bucketReplicationConfig =
+          volume.getBucket(bucket.getName()).getReplicationConfig();
+      nextReplicationConfigRefreshTime =
+          clock.millis() + bucketRepConfigRefreshPeriodMS;
+    }
+    return this.bucketReplicationConfig;
+  }
+
   @Override
   public void renameKey(String key, String newKeyName) throws IOException {
     incrementCounter(Statistic.OBJECTS_RENAMED, 1);
@@ -407,6 +428,11 @@ public class BasicOzoneClientAdapterImpl implements 
OzoneClientAdapter {
     return objectStore.getCanonicalServiceName();
   }
 
+  @VisibleForTesting
+  void setClock(Clock monotonicClock) {
+    this.clock = monotonicClock;
+  }
+
   /**
    * Ozone Delegation Token Renewer.
    */
diff --git 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
index b314f2e..51533bf 100644
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
@@ -42,13 +42,11 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
-import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ozone.OFSPath;
@@ -101,7 +99,7 @@ public class BasicRootedOzoneClientAdapterImpl
   private OzoneClient ozoneClient;
   private ObjectStore objectStore;
   private ClientProtocol proxy;
-  private ReplicationConfig replicationConfig;
+  private ReplicationConfig clientConfiguredReplicationConfig;
   private boolean securityEnabled;
   private int configuredDnPort;
   private BucketLayout defaultOFSBucketLayout;
@@ -171,7 +169,8 @@ public class BasicRootedOzoneClientAdapterImpl
         this.securityEnabled = true;
       }
 
-      replicationConfig = ReplicationConfig.getDefault(conf);
+      clientConfiguredReplicationConfig =
+          OzoneClientUtils.getClientConfiguredReplicationConfig(conf);
 
       if (OmUtils.isOmHAServiceId(conf, omHost)) {
         // omHost is listed as one of the service ids in the config,
@@ -288,15 +287,20 @@ public class BasicRootedOzoneClientAdapterImpl
     return bucket;
   }
 
+  /**
+   * This API returns the value what is configured at client side only. It 
could
+   * differ from the server side default values. If no replication config
+   * configured at client, it will return 3.
+   */
   @Override
   public short getDefaultReplication() {
-    if (replicationConfig == null) {
+    if (clientConfiguredReplicationConfig == null) {
       // to provide backward compatibility, we are just retuning 3;
       // However we need to handle with the correct behavior.
       // TODO: Please see HDDS-5646
       return (short) ReplicationFactor.THREE.getValue();
     }
-    return (short) replicationConfig.getRequiredNodes();
+    return (short) clientConfiguredReplicationConfig.getRequiredNodes();
   }
 
   @Override
@@ -339,29 +343,10 @@ public class BasicRootedOzoneClientAdapterImpl
     try {
       // Hadoop CopyCommands class always sets recursive to true
       OzoneBucket bucket = getBucket(ofsPath, recursive);
-      // if client side replication config is null, we will take legacy default
-      // value at client that is RATIS.
-      ReplicationConfig replConfig = this.replicationConfig != null ?
-          this.replicationConfig :
-          new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE);
-      // Since the bucket has the default replication and type is EC. So, we 
are
-      // using it.
-      if (bucket.getReplicationConfig() != null && 
bucket.getReplicationConfig()
-          .getReplicationType() == HddsProtos.ReplicationType.EC) {
-        replConfig = bucket.getReplicationConfig();
-      }
-      OzoneOutputStream ozoneOutputStream = null;
-      if (replication == ReplicationFactor.ONE.getValue()
-          || replication == ReplicationFactor.THREE.getValue()) {
-
-        ozoneOutputStream = bucket.createFile(key, 0,
-            ReplicationConfig.adjustReplication(
-                replConfig, replication, config),
-            overWrite, recursive);
-      } else {
-        ozoneOutputStream =
-            bucket.createFile(key, 0, replConfig, overWrite, recursive);
-      }
+      OzoneOutputStream ozoneOutputStream = bucket.createFile(key, 0,
+          OzoneClientUtils.resolveClientSideReplicationConfig(replication,
+              this.clientConfiguredReplicationConfig,
+              bucket.getReplicationConfig(), config), overWrite, recursive);
       return new OzoneFSOutputStream(ozoneOutputStream.getOutputStream());
     } catch (OMException ex) {
       if (ex.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS
diff --git 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientUtils.java
 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientUtils.java
index 5c31917..6e94553 100644
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientUtils.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientUtils.java
@@ -17,6 +17,12 @@
 package org.apache.hadoop.fs.ozone;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -25,6 +31,9 @@ import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import java.io.IOException;
 import java.util.Set;
 
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_REPLICATION;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_REPLICATION_TYPE;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DETECTED_LOOP_IN_BUCKET_LINKS;
 
 /**
@@ -64,4 +73,78 @@ public final class OzoneClientUtils {
     }
     return bucket.getBucketLayout();
   }
+
+  /**
+   * This API used to resolve the client side configuration preference for file
+   * system layer implementations.
+   *
+   * @param replication                - replication value passed from FS API.
+   * @param clientConfiguredReplConfig - Client side configured replication
+   *                                   config.
+   * @param bucketReplConfig           - server side bucket default replication
+   *                                  config.
+   * @param config                     - Ozone configuration object.
+   * @return client resolved replication config.
+   */
+  public static ReplicationConfig resolveClientSideReplicationConfig(
+      short replication, ReplicationConfig clientConfiguredReplConfig,
+      ReplicationConfig bucketReplConfig, OzoneConfiguration config) {
+    ReplicationConfig clientDeterminedReplConfig = null;
+
+    boolean isECBucket = bucketReplConfig != null && bucketReplConfig
+        .getReplicationType() == HddsProtos.ReplicationType.EC;
+
+    // if bucket replication config configured with EC, we will give high
+    // preference to server side bucket defaults.
+    // Why we give high prefernce to EC is, there is no way for file system
+    // interfaces to pass EC replication. So, if one configures EC at bucket,
+    // we consider EC to take preference. in short, keys created from file
+    // system under EC bucket will always be EC'd.
+    if (isECBucket) {
+      // if bucket is EC, don't bother client provided configs, let's pass
+      // bucket config.
+      clientDeterminedReplConfig = bucketReplConfig;
+    } else {
+      // Let's validate the client side available replication configs.
+      boolean isReplicationInSupportedList =
+          (replication == ReplicationFactor.ONE
+              .getValue() || replication == 
ReplicationFactor.THREE.getValue());
+      if (isReplicationInSupportedList) {
+        if (clientConfiguredReplConfig != null) {
+          // Uses the replication(short value) passed from file system API and
+          // construct replication config object.
+          // In case if client explicitely configured EC in configurations, we
+          // always take EC as priority as EC replication can't be expressed in
+          // filesystem API.
+          clientDeterminedReplConfig = ReplicationConfig
+              .adjustReplication(clientConfiguredReplConfig, replication,
+                  config);
+        } else {
+          // In file system layers, replication parameter always passed.
+          // so, to respect the API provided replication value, we take RATIS 
as
+          // default type.
+          clientDeterminedReplConfig = ReplicationConfig
+              .parse(ReplicationType.RATIS, Short.toString(replication),
+                  config);
+        }
+      } else {
+        // API passed replication number is not in supported replication list.
+        // So, let's use whatever available in client side configured.
+        // By default it will be null, so server will use server defaults.
+        clientDeterminedReplConfig = clientConfiguredReplConfig;
+      }
+    }
+    return clientDeterminedReplConfig;
+  }
+
+  static ReplicationConfig getClientConfiguredReplicationConfig(
+      ConfigurationSource config) {
+    String replication = config.get(OZONE_REPLICATION);
+    if (replication == null) {
+      return null;
+    }
+    return ReplicationConfig.parse(ReplicationType.valueOf(
+        config.get(OZONE_REPLICATION_TYPE, OZONE_REPLICATION_TYPE_DEFAULT)),
+        replication, config);
+  }
 }
diff --git 
a/hadoop-ozone/ozonefs-common/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneClientUtils.java
 
b/hadoop-ozone/ozonefs-common/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneClientUtils.java
new file mode 100644
index 0000000..46ab0bf
--- /dev/null
+++ 
b/hadoop-ozone/ozonefs-common/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneClientUtils.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.fs.ozone;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests the behavior of OzoneClientUtils APIs.
+ */
+public class TestOzoneClientUtils {
+  private ReplicationConfig ecReplicationConfig =
+      new ECReplicationConfig("rs-3-2-1024K");
+  private ReplicationConfig ratis3ReplicationConfig =
+      new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE);
+  private ReplicationConfig ratis1ReplicationConfig =
+      new RatisReplicationConfig(HddsProtos.ReplicationFactor.ONE);
+
+  @Test
+  public void testResolveClientSideRepConfigWhenBucketHasEC() {
+    ReplicationConfig replicationConfig = OzoneClientUtils
+        .resolveClientSideReplicationConfig(
+            (short) 3, null,
+            ecReplicationConfig, new OzoneConfiguration());
+    // Bucket default is EC.
+    Assert.assertEquals(ecReplicationConfig, replicationConfig);
+  }
+
+  /**
+   * When bucket replication is null and it should respect fs passed value.
+   */
+  @Test
+  public void testResolveClientSideRepConfigWhenBucketHasNull() {
+    ReplicationConfig replicationConfig = OzoneClientUtils
+        .resolveClientSideReplicationConfig(
+            (short) 3, null, null,
+            new OzoneConfiguration());
+    // Passed replication is 3 - Ozone mapped replication is ratis THREE
+    Assert.assertEquals(ratis3ReplicationConfig, replicationConfig);
+  }
+
+  /**
+   * When bucket replication is null and it should return null if fs passed
+   * value is invalid.
+   */
+  @Test
+  public void testResolveClientSideRepConfigWhenFSPassedReplicationIsInvalid() 
{
+    ReplicationConfig replicationConfig = OzoneClientUtils
+        .resolveClientSideReplicationConfig(
+            (short) -1, null, null,
+            new OzoneConfiguration());
+    // client configured value also null.
+    // This API caller should leave the decision to server.
+    Assert.assertNull(replicationConfig);
+  }
+
+  /**
+   * When bucket default is non-EC and client side values are not valid, we
+   * would just return null, so servers can make decision in this case.
+   */
+  @Test
+  public void testResolveRepConfWhenFSPassedIsInvalidButBucketDefaultNonEC() {
+    ReplicationConfig replicationConfig = OzoneClientUtils
+        .resolveClientSideReplicationConfig(
+            (short) -1, null, ratis3ReplicationConfig,
+            new OzoneConfiguration());
+    // Configured client config also null.
+    Assert.assertNull(replicationConfig);
+  }
+
+  /**
+   * When bucket default is non-EC and client side value is valid, we
+   * would should return client side valid value.
+   */
+  @Test
+  public void testResolveRepConfWhenFSPassedIsValidButBucketDefaultNonEC() {
+    ReplicationConfig replicationConfig = OzoneClientUtils
+        .resolveClientSideReplicationConfig(
+            (short) 1, null, ratis3ReplicationConfig,
+            new OzoneConfiguration());
+    // Passed value is replication one - Ozone mapped value is ratis ONE
+    Assert.assertEquals(ratis1ReplicationConfig, replicationConfig);
+  }
+
+  /**
+   * When bucket default is EC and client side value also valid, we would just
+   * return bucket default EC.
+   */
+  @Test
+  public void testResolveRepConfWhenFSPassedIsValidButBucketDefaultEC() {
+    ReplicationConfig replicationConfig = OzoneClientUtils
+        .resolveClientSideReplicationConfig(
+            (short) 3, ratis3ReplicationConfig,
+            ecReplicationConfig, new OzoneConfiguration());
+    // Bucket default is EC
+    Assert.assertEquals(ecReplicationConfig, replicationConfig);
+  }
+
+  /**
+   * When bucket default is non-EC and client side passed value also not valid
+   * but configured value is valid, we would just return configured value.
+   */
+  @Test
+  public void testResolveRepConfWhenFSPassedIsInvalidAndBucketDefaultNonEC() {
+    ReplicationConfig replicationConfig = OzoneClientUtils
+        .resolveClientSideReplicationConfig(
+            (short) -1, ratis3ReplicationConfig, ratis1ReplicationConfig,
+            new OzoneConfiguration());
+    // Configured value is ratis THREE
+    Assert.assertEquals(ratis3ReplicationConfig, replicationConfig);
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to