This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new fc99132a86 HDDS-10183. Dynamic reconfiguration of replication server
thread pool size (#6052)
fc99132a86 is described below
commit fc99132a8657c97751fb2223b11845d09b182975
Author: hao guo <[email protected]>
AuthorDate: Tue Jan 30 06:35:44 2024 +0800
HDDS-10183. Dynamic reconfiguration of replication server thread pool size
(#6052)
---
.../apache/hadoop/ozone/HddsDatanodeService.java | 13 ++++++++++-
.../ozone/container/ozoneimpl/OzoneContainer.java | 4 ++++
.../container/replication/ReplicationServer.java | 26 ++++++++++++++++++++++
.../reconfig/TestDatanodeReconfiguration.java | 16 +++++++++++++
4 files changed, 58 insertions(+), 1 deletion(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
index 3cd0477ffd..f59622cb0f 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
@@ -82,6 +82,7 @@ import static
org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_DATANODE_PLUGINS_KEY;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_WORKERS;
import static
org.apache.hadoop.ozone.conf.OzoneServiceConfig.DEFAULT_SHUTDOWN_HOOK_PRIORITY;
import static org.apache.hadoop.ozone.common.Storage.StorageState.INITIALIZED;
+import static
org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig.REPLICATION_STREAMS_LIMIT_KEY;
import static org.apache.hadoop.security.UserGroupInformation.getCurrentUser;
import static
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.HDDS_DATANODE_BLOCK_DELETE_THREAD_MAX;
import static org.apache.hadoop.util.ExitUtil.terminate;
@@ -291,7 +292,9 @@ public class HddsDatanodeService extends GenericCli
implements ServicePlugin {
.register(HDDS_DATANODE_BLOCK_DELETE_THREAD_MAX,
this::reconfigBlockDeleteThreadMax)
.register(OZONE_BLOCK_DELETING_SERVICE_WORKERS,
- this::reconfigDeletingServiceWorkers);
+ this::reconfigDeletingServiceWorkers)
+ .register(REPLICATION_STREAMS_LIMIT_KEY,
+ this::reconfigReplicationStreamsLimit);
datanodeStateMachine = new DatanodeStateMachine(datanodeDetails, conf,
dnCertClient, secretKeyClient, this::terminateDatanode, dnCRLStore,
@@ -667,4 +670,12 @@ public class HddsDatanodeService extends GenericCli
implements ServicePlugin {
.setPoolSize(Integer.parseInt(value));
return value;
}
+
+ private String reconfigReplicationStreamsLimit(String value) {
+ getConf().set(REPLICATION_STREAMS_LIMIT_KEY, value);
+
+ getDatanodeStateMachine().getContainer().getReplicationServer()
+ .setPoolSize(Integer.parseInt(value));
+ return value;
+ }
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index f050c96a45..d4425f75a5 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -584,4 +584,8 @@ public class OzoneContainer {
return blockDeletingService;
}
+ public ReplicationServer getReplicationServer() {
+ return replicationServer;
+ }
+
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
index 3feb574748..d2407a61d0 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
@@ -154,6 +155,31 @@ public class ReplicationServer {
return port;
}
+ public void setPoolSize(int size) {
+ if (size <= 0) {
+ throw new IllegalArgumentException("Pool size must be positive.");
+ }
+
+ int currentCorePoolSize = executor.getCorePoolSize();
+
+ // In ThreadPoolExecutor, maximumPoolSize must always be greater than or
+ // equal to the corePoolSize. We must make sure this invariant holds when
+ // changing the pool size. Therefore, we take into account whether the
+ // new size is greater or smaller than the current core pool size.
+ if (size > currentCorePoolSize) {
+ executor.setMaximumPoolSize(size);
+ executor.setCorePoolSize(size);
+ } else {
+ executor.setCorePoolSize(size);
+ executor.setMaximumPoolSize(size);
+ }
+ }
+
+ @VisibleForTesting
+ public ThreadPoolExecutor getExecutor() {
+ return executor;
+ }
+
/**
* Replication-related configuration.
*/
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestDatanodeReconfiguration.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestDatanodeReconfiguration.java
index c0b3d7d541..c3a38f3b5e 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestDatanodeReconfiguration.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/reconfig/TestDatanodeReconfiguration.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_WORKERS;
import static
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.HDDS_DATANODE_BLOCK_DELETE_THREAD_MAX;
+import static
org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig.REPLICATION_STREAMS_LIMIT_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
@@ -49,6 +50,7 @@ class TestDatanodeReconfiguration extends
ReconfigurationTestBase {
Set<String> expected = ImmutableSet.<String>builder()
.add(HDDS_DATANODE_BLOCK_DELETE_THREAD_MAX)
.add(OZONE_BLOCK_DELETING_SERVICE_WORKERS)
+ .add(REPLICATION_STREAMS_LIMIT_KEY)
.addAll(new DatanodeConfiguration().reconfigurableProperties())
.build();
@@ -91,6 +93,20 @@ class TestDatanodeReconfiguration extends
ReconfigurationTestBase {
assertEquals(newValue, executor.getCorePoolSize());
}
+ @ParameterizedTest
+ @ValueSource(ints = { -1, +1 })
+ void replicationStreamsLimit(int delta) throws ReconfigurationException {
+ ThreadPoolExecutor executor =
+ getFirstDatanode().getDatanodeStateMachine().getContainer()
+ .getReplicationServer().getExecutor();
+ int newValue = executor.getCorePoolSize() + delta;
+
+ getFirstDatanode().getReconfigurationHandler().reconfigurePropertyImpl(
+ REPLICATION_STREAMS_LIMIT_KEY, String.valueOf(newValue));
+ assertEquals(newValue, executor.getMaximumPoolSize());
+ assertEquals(newValue, executor.getCorePoolSize());
+ }
+
private HddsDatanodeService getFirstDatanode() {
return getCluster().getHddsDatanodes().get(0);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]