This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 81f1364e31 HDDS-8532. Add config for factor of scaling up replication 
queue/threads in decommissioning nodes (#4687)
81f1364e31 is described below

commit 81f1364e317255aea83fa44f026012208664f63b
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Tue May 9 20:42:33 2023 +0200

    HDDS-8532. Add config for factor of scaling up replication queue/threads in 
decommissioning nodes (#4687)
---
 .../container/replication/ReplicationServer.java   | 40 ++++++++++++++++++++++
 .../replication/ReplicationSupervisor.java         |  5 +--
 .../replication/TestReplicationConfig.java         | 14 ++++++++
 .../container/replication/ReplicationManager.java  |  9 ++++-
 4 files changed, 65 insertions(+), 3 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
index cdc1975360..5472a24be7 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.hadoop.hdds.conf.ConfigTag.DATANODE;
 import static org.apache.hadoop.hdds.conf.ConfigTag.MANAGEMENT;
+import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
 
 /**
  * Separated network server for server2server container replication.
@@ -135,6 +136,14 @@ public class ReplicationServer {
         PREFIX + "." + STREAMS_LIMIT_KEY;
 
     public static final int REPLICATION_MAX_STREAMS_DEFAULT = 10;
+    private static final String OUTOFSERVICE_FACTOR_KEY =
+        "outofservice.limit.factor";
+    private static final double OUTOFSERVICE_FACTOR_MIN = 1;
+    static final double OUTOFSERVICE_FACTOR_DEFAULT = 2;
+    private static final String OUTOFSERVICE_FACTOR_DEFAULT_VALUE = "2.0";
+    private static final double OUTOFSERVICE_FACTOR_MAX = 10;
+    static final String REPLICATION_OUTOFSERVICE_FACTOR_KEY =
+        PREFIX + "." + OUTOFSERVICE_FACTOR_KEY;
 
     /**
      * The maximum number of replication commands a single datanode can execute
@@ -154,6 +163,25 @@ public class ReplicationServer {
         tags = {DATANODE, MANAGEMENT})
     private int port;
 
+    @Config(key = OUTOFSERVICE_FACTOR_KEY,
+        type = ConfigType.DOUBLE,
+        defaultValue = OUTOFSERVICE_FACTOR_DEFAULT_VALUE,
+        tags = {DATANODE, SCM},
+        description = "Decommissioning and maintenance nodes can handle more" +
+            "replication commands than in-service nodes due to reduced load. " 
+
+            "This multiplier determines the increased queue capacity and " +
+            "executor pool size."
+    )
+    private double outOfServiceFactor = OUTOFSERVICE_FACTOR_DEFAULT;
+
+    public double getOutOfServiceFactor() {
+      return outOfServiceFactor;
+    }
+
+    public int scaleOutOfServiceLimit(int original) {
+      return (int) Math.ceil(original * outOfServiceFactor);
+    }
+
     public int getPort() {
       return port;
     }
@@ -179,6 +207,18 @@ public class ReplicationServer {
             replicationMaxStreams, REPLICATION_MAX_STREAMS_DEFAULT);
         replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT;
       }
+
+      if (outOfServiceFactor < OUTOFSERVICE_FACTOR_MIN ||
+          outOfServiceFactor > OUTOFSERVICE_FACTOR_MAX) {
+        LOG.warn(
+            "{} must be between {} and {} but was set to {}. Defaulting to {}",
+            REPLICATION_OUTOFSERVICE_FACTOR_KEY,
+            OUTOFSERVICE_FACTOR_MIN,
+            OUTOFSERVICE_FACTOR_MAX,
+            outOfServiceFactor,
+            OUTOFSERVICE_FACTOR_DEFAULT);
+        outOfServiceFactor = OUTOFSERVICE_FACTOR_DEFAULT;
+      }
     }
 
   }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
index 63d5e2b0c2..51109846df 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
@@ -288,8 +288,9 @@ public final class ReplicationSupervisor {
       int newMaxQueueSize = datanodeConfig.getCommandQueueLimit();
 
       if (isMaintenance(newState) || isDecommission(newState)) {
-        threadCount *= 2;
-        newMaxQueueSize *= 2;
+        threadCount = replicationConfig.scaleOutOfServiceLimit(threadCount);
+        newMaxQueueSize =
+            replicationConfig.scaleOutOfServiceLimit(newMaxQueueSize);
       }
 
       LOG.info("Node state updated to {}, scaling executor pool size to {}",
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationConfig.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationConfig.java
index bbdea62dda..3adde368db 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationConfig.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationConfig.java
@@ -21,7 +21,9 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import 
org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig;
 import org.junit.jupiter.api.Test;
 
+import static 
org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig.OUTOFSERVICE_FACTOR_DEFAULT;
 import static 
org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig.REPLICATION_MAX_STREAMS_DEFAULT;
+import static 
org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig.REPLICATION_OUTOFSERVICE_FACTOR_KEY;
 import static 
org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig.REPLICATION_STREAMS_LIMIT_KEY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -34,22 +36,30 @@ public class TestReplicationConfig {
   public void acceptsValidValues() {
     // GIVEN
     int validReplicationLimit = 123;
+    double validOutOfServiceFactor = 3.0;
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.setInt(REPLICATION_STREAMS_LIMIT_KEY, validReplicationLimit);
+    conf.setDouble(REPLICATION_OUTOFSERVICE_FACTOR_KEY,
+        validOutOfServiceFactor);
 
     // WHEN
     ReplicationConfig subject = conf.getObject(ReplicationConfig.class);
 
     // THEN
     assertEquals(validReplicationLimit, subject.getReplicationMaxStreams());
+    assertEquals(validOutOfServiceFactor, subject.getOutOfServiceFactor(),
+        0.001);
   }
 
   @Test
   public void overridesInvalidValues() {
     // GIVEN
     int invalidReplicationLimit = -5;
+    double invalidOutOfServiceFactor = 0.5;
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.setInt(REPLICATION_STREAMS_LIMIT_KEY, invalidReplicationLimit);
+    conf.setDouble(REPLICATION_OUTOFSERVICE_FACTOR_KEY,
+        invalidOutOfServiceFactor);
 
     // WHEN
     ReplicationConfig subject = conf.getObject(ReplicationConfig.class);
@@ -57,6 +67,8 @@ public class TestReplicationConfig {
     // THEN
     assertEquals(REPLICATION_MAX_STREAMS_DEFAULT,
         subject.getReplicationMaxStreams());
+    assertEquals(OUTOFSERVICE_FACTOR_DEFAULT,
+        subject.getOutOfServiceFactor(), 0.001);
   }
 
   @Test
@@ -70,6 +82,8 @@ public class TestReplicationConfig {
     // THEN
     assertEquals(REPLICATION_MAX_STREAMS_DEFAULT,
         subject.getReplicationMaxStreams());
+    assertEquals(OUTOFSERVICE_FACTOR_DEFAULT,
+        subject.getOutOfServiceFactor(), 0.001);
   }
 
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 88bd10985e..8e10b4ab87 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -60,6 +60,7 @@ import 
org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import 
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
+import org.apache.hadoop.ozone.container.replication.ReplicationServer;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import 
org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
@@ -121,6 +122,10 @@ public class ReplicationManager implements SCMService {
    * ReplicationManager specific configuration.
    */
   private final ReplicationManagerConfiguration rmConf;
+  /**
+   * Datanodes' replication configuration.
+   */
+  private final ReplicationServer.ReplicationConfig replicationServerConf;
   private final NodeManager nodeManager;
 
   /**
@@ -226,6 +231,8 @@ public class ReplicationManager implements SCMService {
     this.containerManager = containerManager;
     this.scmContext = scmContext;
     this.rmConf = conf.getObject(ReplicationManagerConfiguration.class);
+    this.replicationServerConf =
+        conf.getObject(ReplicationServer.ReplicationConfig.class);
     this.running = false;
     this.clock = clock;
     this.containerReport = new ReplicationManagerReport();
@@ -1490,7 +1497,7 @@ public class ReplicationManager implements SCMService {
     HddsProtos.NodeOperationalState state = datanode.getPersistedOpState();
     int limit = datanodeReplicationLimit;
     if (isMaintenance(state) || isDecommission(state)) {
-      limit *= 2;
+      limit = replicationServerConf.scaleOutOfServiceLimit(limit);
     }
     return limit;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to