This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e46d4e36f HDDS-9208. Add queue limit in ReplicationServer. (#5216)
8e46d4e36f is described below

commit 8e46d4e36fcb996c0c03606b1a9331ad934a95ca
Author: z-bb <[email protected]>
AuthorDate: Wed Sep 13 14:29:28 2023 +0800

    HDDS-9208. Add queue limit in ReplicationServer. (#5216)
---
 .../container/replication/ReplicationServer.java   | 31 +++++++++++++++++++---
 1 file changed, 28 insertions(+), 3 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
index 657a4dbe6d..a2e0209f64 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
@@ -78,13 +78,17 @@ public class ReplicationServer {
 
     int replicationServerWorkers =
         replicationConfig.getReplicationMaxStreams();
-    LOG.info("Initializing replication server with thread count = {}",
-        replicationConfig.getReplicationMaxStreams());
+    int replicationQueueLimit =
+        replicationConfig.getReplicationQueueLimit();
+    LOG.info("Initializing replication server with thread count = {}"
+        + " queue length = {}",
+        replicationConfig.getReplicationMaxStreams(),
+        replicationConfig.getReplicationQueueLimit());
     this.executor =
         new ThreadPoolExecutor(replicationServerWorkers,
             replicationServerWorkers,
             60, TimeUnit.SECONDS,
-            new LinkedBlockingQueue<>(),
+            new LinkedBlockingQueue<>(replicationQueueLimit),
             new ThreadFactoryBuilder().setDaemon(true)
                 .setNameFormat("ReplicationContainerReader-%d")
                 .build());
@@ -153,6 +157,7 @@ public class ReplicationServer {
 
     public static final String PREFIX = "hdds.datanode.replication";
     public static final String STREAMS_LIMIT_KEY = "streams.limit";
+    public static final String QUEUE_LIMIT = "queue.limit";
 
     public static final String REPLICATION_STREAMS_LIMIT_KEY =
         PREFIX + "." + STREAMS_LIMIT_KEY;
@@ -180,6 +185,18 @@ public class ReplicationServer {
     )
     private int replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT;
 
+    /**
+     * The maximum of replication request queue length.
+     */
+    @Config(key = QUEUE_LIMIT,
+        type = ConfigType.INT,
+        defaultValue = "4096",
+        tags = {DATANODE},
+        description = "The maximum number of queued requests for container " +
+            "replication"
+    )
+    private int replicationQueueLimit = 4096;
+
     @Config(key = "port", defaultValue = "9886",
         description = "Port used for the server2server replication server",
         tags = {DATANODE, MANAGEMENT})
@@ -221,6 +238,14 @@ public class ReplicationServer {
       this.replicationMaxStreams = replicationMaxStreams;
     }
 
+    public int getReplicationQueueLimit() {
+      return replicationQueueLimit;
+    }
+
+    public void setReplicationQueueLimit(int limit) {
+      this.replicationQueueLimit = limit;
+    }
+
     @PostConstruct
     public void validate() {
       if (replicationMaxStreams < 1) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to