This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new fc99132a86 HDDS-10183. Dynamic reconfiguration of replication server 
thread pool size (#6052)
fc99132a86 is described below

commit fc99132a8657c97751fb2223b11845d09b182975
Author: hao guo <[email protected]>
AuthorDate: Tue Jan 30 06:35:44 2024 +0800

    HDDS-10183. Dynamic reconfiguration of replication server thread pool size 
(#6052)
---
 .../apache/hadoop/ozone/HddsDatanodeService.java   | 13 ++++++++++-
 .../ozone/container/ozoneimpl/OzoneContainer.java  |  4 ++++
 .../container/replication/ReplicationServer.java   | 26 ++++++++++++++++++++++
 .../reconfig/TestDatanodeReconfiguration.java      | 16 +++++++++++++
 4 files changed, 58 insertions(+), 1 deletion(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
index 3cd0477ffd..f59622cb0f 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
@@ -82,6 +82,7 @@ import static 
org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_DATANODE_PLUGINS_KEY;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_WORKERS;
 import static 
org.apache.hadoop.ozone.conf.OzoneServiceConfig.DEFAULT_SHUTDOWN_HOOK_PRIORITY;
 import static org.apache.hadoop.ozone.common.Storage.StorageState.INITIALIZED;
+import static 
org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig.REPLICATION_STREAMS_LIMIT_KEY;
 import static org.apache.hadoop.security.UserGroupInformation.getCurrentUser;
 import static 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.HDDS_DATANODE_BLOCK_DELETE_THREAD_MAX;
 import static org.apache.hadoop.util.ExitUtil.terminate;
@@ -291,7 +292,9 @@ public class HddsDatanodeService extends GenericCli 
implements ServicePlugin {
               .register(HDDS_DATANODE_BLOCK_DELETE_THREAD_MAX,
                   this::reconfigBlockDeleteThreadMax)
               .register(OZONE_BLOCK_DELETING_SERVICE_WORKERS,
-                  this::reconfigDeletingServiceWorkers);
+                  this::reconfigDeletingServiceWorkers)
+              .register(REPLICATION_STREAMS_LIMIT_KEY,
+                  this::reconfigReplicationStreamsLimit);
 
       datanodeStateMachine = new DatanodeStateMachine(datanodeDetails, conf,
           dnCertClient, secretKeyClient, this::terminateDatanode, dnCRLStore,
@@ -667,4 +670,12 @@ public class HddsDatanodeService extends GenericCli 
implements ServicePlugin {
         .setPoolSize(Integer.parseInt(value));
     return value;
   }
+
+  private String reconfigReplicationStreamsLimit(String value) {
+    getConf().set(REPLICATION_STREAMS_LIMIT_KEY, value);
+
+    getDatanodeStateMachine().getContainer().getReplicationServer()
+        .setPoolSize(Integer.parseInt(value));
+    return value;
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index f050c96a45..d4425f75a5 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -584,4 +584,8 @@ public class OzoneContainer {
     return blockDeletingService;
   }
 
+  public ReplicationServer getReplicationServer() {
+    return replicationServer;
+  }
+
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
index 3feb574748..d2407a61d0 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.hdds.conf.Config;
 import org.apache.hadoop.hdds.conf.ConfigGroup;
@@ -154,6 +155,31 @@ public class ReplicationServer {
     return port;
   }
 
+  public void setPoolSize(int size) {
+    if (size <= 0) {
+      throw new IllegalArgumentException("Pool size must be positive.");
+    }
+
+    int currentCorePoolSize = executor.getCorePoolSize();
+
+    // In ThreadPoolExecutor, maximumPoolSize must always be greater than or
+    // equal to the corePoolSize. We must make sure this invariant holds when
+    // changing the pool size. Therefore, we take into account whether the
+    // new size is greater or smaller than the current core pool size.
+    if (size > currentCorePoolSize) {
+      executor.setMaximumPoolSize(size);
+      executor.setCorePoolSize(size);
+    } else {
+      executor.setCorePoolSize(size);
+      executor.setMaximumPoolSize(size);
+    }
+  }
+
+  @VisibleForTesting
+  public ThreadPoolExecutor getExecutor() {
+    return executor;
+  }
+
   /**
    * Replication-related configuration.
    */
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestDatanodeReconfiguration.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestDatanodeReconfiguration.java
index c0b3d7d541..c3a38f3b5e 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestDatanodeReconfiguration.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestDatanodeReconfiguration.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_WORKERS;
 import static 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.HDDS_DATANODE_BLOCK_DELETE_THREAD_MAX;
+import static 
org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig.REPLICATION_STREAMS_LIMIT_KEY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
@@ -49,6 +50,7 @@ class TestDatanodeReconfiguration extends 
ReconfigurationTestBase {
     Set<String> expected = ImmutableSet.<String>builder()
         .add(HDDS_DATANODE_BLOCK_DELETE_THREAD_MAX)
         .add(OZONE_BLOCK_DELETING_SERVICE_WORKERS)
+        .add(REPLICATION_STREAMS_LIMIT_KEY)
         .addAll(new DatanodeConfiguration().reconfigurableProperties())
         .build();
 
@@ -91,6 +93,20 @@ class TestDatanodeReconfiguration extends 
ReconfigurationTestBase {
     assertEquals(newValue, executor.getCorePoolSize());
   }
 
+  @ParameterizedTest
+  @ValueSource(ints = { -1, +1 })
+  void replicationStreamsLimit(int delta) throws ReconfigurationException {
+    ThreadPoolExecutor executor =
+        getFirstDatanode().getDatanodeStateMachine().getContainer()
+            .getReplicationServer().getExecutor();
+    int newValue = executor.getCorePoolSize() + delta;
+
+    getFirstDatanode().getReconfigurationHandler().reconfigurePropertyImpl(
+        REPLICATION_STREAMS_LIMIT_KEY, String.valueOf(newValue));
+    assertEquals(newValue, executor.getMaximumPoolSize());
+    assertEquals(newValue, executor.getCorePoolSize());
+  }
+
   private HddsDatanodeService getFirstDatanode() {
     return getCluster().getHddsDatanodes().get(0);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to