This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 8e46d4e36f HDDS-9208. Add queue limit in ReplicationServer. (#5216)
8e46d4e36f is described below
commit 8e46d4e36fcb996c0c03606b1a9331ad934a95ca
Author: z-bb <[email protected]>
AuthorDate: Wed Sep 13 14:29:28 2023 +0800
HDDS-9208. Add queue limit in ReplicationServer. (#5216)
---
.../container/replication/ReplicationServer.java | 31 +++++++++++++++++++---
1 file changed, 28 insertions(+), 3 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
index 657a4dbe6d..a2e0209f64 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
@@ -78,13 +78,17 @@ public class ReplicationServer {
int replicationServerWorkers =
replicationConfig.getReplicationMaxStreams();
- LOG.info("Initializing replication server with thread count = {}",
- replicationConfig.getReplicationMaxStreams());
+ int replicationQueueLimit =
+ replicationConfig.getReplicationQueueLimit();
+ LOG.info("Initializing replication server with thread count = {}"
+ + " queue length = {}",
+ replicationConfig.getReplicationMaxStreams(),
+ replicationConfig.getReplicationQueueLimit());
this.executor =
new ThreadPoolExecutor(replicationServerWorkers,
replicationServerWorkers,
60, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(),
+ new LinkedBlockingQueue<>(replicationQueueLimit),
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("ReplicationContainerReader-%d")
.build());
@@ -153,6 +157,7 @@ public class ReplicationServer {
public static final String PREFIX = "hdds.datanode.replication";
public static final String STREAMS_LIMIT_KEY = "streams.limit";
+ public static final String QUEUE_LIMIT = "queue.limit";
public static final String REPLICATION_STREAMS_LIMIT_KEY =
PREFIX + "." + STREAMS_LIMIT_KEY;
@@ -180,6 +185,18 @@ public class ReplicationServer {
)
private int replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT;
+ /**
+ * The maximum of replication request queue length.
+ */
+ @Config(key = QUEUE_LIMIT,
+ type = ConfigType.INT,
+ defaultValue = "4096",
+ tags = {DATANODE},
+ description = "The maximum number of queued requests for container " +
+ "replication"
+ )
+ private int replicationQueueLimit = 4096;
+
@Config(key = "port", defaultValue = "9886",
description = "Port used for the server2server replication server",
tags = {DATANODE, MANAGEMENT})
@@ -221,6 +238,14 @@ public class ReplicationServer {
this.replicationMaxStreams = replicationMaxStreams;
}
+ public int getReplicationQueueLimit() {
+ return replicationQueueLimit;
+ }
+
+ public void setReplicationQueueLimit(int limit) {
+ this.replicationQueueLimit = limit;
+ }
+
@PostConstruct
public void validate() {
if (replicationMaxStreams < 1) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]