This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new f46e050af2 HDDS-8626. Config thread pool in ReplicationServer (#4715)
f46e050af2 is described below

commit f46e050af2ddc924d78d5fece77da362f91b0a52
Author: hao guo <[email protected]>
AuthorDate: Wed May 17 04:21:52 2023 +0800

    HDDS-8626. Config thread pool in ReplicationServer (#4715)
---
 .../container/replication/ReplicationServer.java   | 22 +++++++++++++++++++++-
 1 file changed, 21 insertions(+), 1 deletion(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
index 5472a24be7..ee1faf8917 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
@@ -18,8 +18,11 @@
 package org.apache.hadoop.ozone.container.replication;
 
 import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.hdds.conf.Config;
 import org.apache.hadoop.hdds.conf.ConfigGroup;
 import org.apache.hadoop.hdds.conf.ConfigType;
@@ -62,6 +65,8 @@ public class ReplicationServer {
   private int port;
   private final ContainerImporter importer;
 
+  private ThreadPoolExecutor executor;
+
   public ReplicationServer(ContainerController controller,
       ReplicationConfig replicationConfig, SecurityConfig secConf,
       CertificateClient caClient, ContainerImporter importer) {
@@ -70,6 +75,18 @@ public class ReplicationServer {
     this.controller = controller;
     this.importer = importer;
     this.port = replicationConfig.getPort();
+
+    int replicationServerWorkers =
+        replicationConfig.getReplicationMaxStreams();
+    this.executor =
+        new ThreadPoolExecutor(replicationServerWorkers,
+            replicationServerWorkers,
+            60, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(),
+            new ThreadFactoryBuilder().setDaemon(true)
+                .setNameFormat("ReplicationContainerReader-%d")
+                .build());
+
     init();
   }
 
@@ -79,7 +96,8 @@ public class ReplicationServer {
         .addService(ServerInterceptors.intercept(new GrpcReplicationService(
             new OnDemandContainerReplicationSource(controller),
             importer
-        ), new GrpcServerInterceptor()));
+        ), new GrpcServerInterceptor()))
+        .executor(executor);
 
     if (secConf.isSecurityEnabled() && secConf.isGrpcTlsEnabled()) {
       try {
@@ -112,6 +130,8 @@ public class ReplicationServer {
 
   public void stop() {
     try {
+      executor.shutdown();
+      executor.awaitTermination(5L, TimeUnit.SECONDS);
       server.shutdown().awaitTermination(10L, TimeUnit.SECONDS);
     } catch (InterruptedException ex) {
       LOG.warn("{} couldn't be stopped gracefully", 
getClass().getSimpleName());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to