This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new f46e050af2 HDDS-8626. Config thread pool in ReplicationServer (#4715)
f46e050af2 is described below
commit f46e050af2ddc924d78d5fece77da362f91b0a52
Author: hao guo <[email protected]>
AuthorDate: Wed May 17 04:21:52 2023 +0800
HDDS-8626. Config thread pool in ReplicationServer (#4715)
---
.../container/replication/ReplicationServer.java | 22 +++++++++++++++++++++-
1 file changed, 21 insertions(+), 1 deletion(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
index 5472a24be7..ee1faf8917 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
@@ -18,8 +18,11 @@
package org.apache.hadoop.ozone.container.replication;
import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigType;
@@ -62,6 +65,8 @@ public class ReplicationServer {
private int port;
private final ContainerImporter importer;
+ private ThreadPoolExecutor executor;
+
public ReplicationServer(ContainerController controller,
ReplicationConfig replicationConfig, SecurityConfig secConf,
CertificateClient caClient, ContainerImporter importer) {
@@ -70,6 +75,18 @@ public class ReplicationServer {
this.controller = controller;
this.importer = importer;
this.port = replicationConfig.getPort();
+
+ int replicationServerWorkers =
+ replicationConfig.getReplicationMaxStreams();
+ this.executor =
+ new ThreadPoolExecutor(replicationServerWorkers,
+ replicationServerWorkers,
+ 60, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("ReplicationContainerReader-%d")
+ .build());
+
init();
}
@@ -79,7 +96,8 @@ public class ReplicationServer {
.addService(ServerInterceptors.intercept(new GrpcReplicationService(
new OnDemandContainerReplicationSource(controller),
importer
- ), new GrpcServerInterceptor()));
+ ), new GrpcServerInterceptor()))
+ .executor(executor);
if (secConf.isSecurityEnabled() && secConf.isGrpcTlsEnabled()) {
try {
@@ -112,6 +130,8 @@ public class ReplicationServer {
public void stop() {
try {
+ executor.shutdown();
+ executor.awaitTermination(5L, TimeUnit.SECONDS);
server.shutdown().awaitTermination(10L, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
LOG.warn("{} couldn't be stopped gracefully",
getClass().getSimpleName());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]