This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a274b6f5a HDDS-8679. Add dedicated, configurable thread pool for OM 
gRPC server (#4771)
1a274b6f5a is described below

commit 1a274b6f5ae5242270dfc264b3503af0eda8c680
Author: hao guo <[email protected]>
AuthorDate: Fri Jun 9 03:16:34 2023 +0800

    HDDS-8679. Add dedicated, configurable thread pool for OM gRPC server 
(#4771)
---
 .../common/src/main/resources/ozone-default.xml    | 28 +++++++++++
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   | 13 ++++++
 .../hadoop/ozone/om/GrpcOzoneManagerServer.java    | 54 ++++++++++++++++++++++
 3 files changed, 95 insertions(+)

diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index f90fea4908..74b7c0c0d0 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3525,6 +3525,34 @@
       OM/S3GATEWAY OMRequest, OMResponse over grpc max message length (bytes).
     </description>
   </property>
+
+  <property>
+    <name>ozone.om.grpc.read.thread.num</name>
+    <value>32</value>
+    <tag>OZONE, OM, S3GATEWAY</tag>
+    <description>
+      OM grpc server read thread pool core thread size.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.om.grpc.bossgroup.size</name>
+    <value>8</value>
+    <tag>OZONE, OM, S3GATEWAY</tag>
+    <description>
+      OM grpc server netty boss event group size.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.om.grpc.workergroup.size</name>
+    <value>32</value>
+    <tag>OZONE, OM, S3GATEWAY</tag>
+    <description>
+      OM grpc server netty worker event group size.
+    </description>
+  </property>
+
   <property>
     <name>ozone.default.bucket.layout</name>
     <value/>
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 4442b6d061..9a7acb02f5 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -80,6 +80,19 @@ public final class OMConfigKeys {
   public static final int OZONE_OM_PORT_DEFAULT = 9862;
   public static final String OZONE_OM_GRPC_PORT_KEY =
       "ozone.om.grpc.port";
+
+  public static final String OZONE_OM_GRPC_BOSSGROUP_SIZE_KEY =
+      "ozone.om.grpc.bossgroup.size";
+  public static final int OZONE_OM_GRPC_BOSSGROUP_SIZE_DEFAULT = 8;
+
+  public static final String OZONE_OM_GRPC_WORKERGROUP_SIZE_KEY =
+      "ozone.om.grpc.workergroup.size";
+  public static final int OZONE_OM_GRPC_WORKERGROUP_SIZE_DEFAULT = 32;
+
+  public static final String OZONE_OM_GRPC_READ_THREAD_NUM_KEY =
+      "ozone.om.grpc.read.thread.num";
+  public static final int OZONE_OM_GRPC_READ_THREAD_NUM_DEFAULT = 32;
+
   public static final String OZONE_OM_HTTP_ENABLED_KEY =
       "ozone.om.http.enabled";
   public static final String OZONE_OM_HTTP_BIND_HOST_KEY =
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java
index 16f8af31a7..c452bf48e4 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java
@@ -19,8 +19,12 @@ package org.apache.hadoop.ozone.om;
 
 import java.io.IOException;
 import java.util.OptionalInt;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.security.ssl.KeyStoresFactory;
@@ -36,6 +40,9 @@ import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import io.grpc.netty.GrpcSslContexts;
 import io.grpc.netty.NettyServerBuilder;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.ssl.SslContextBuilder;
 import io.netty.handler.ssl.SslProvider;
 import io.grpc.Server;
@@ -46,8 +53,14 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_TLS_PROVIDER;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_TLS_PROVIDER_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_GRPC_BOSSGROUP_SIZE_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_GRPC_BOSSGROUP_SIZE_KEY;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_GRPC_READ_THREAD_NUM_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_GRPC_READ_THREAD_NUM_KEY;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_GRPC_WORKERGROUP_SIZE_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_GRPC_WORKERGROUP_SIZE_KEY;
 
 /**
  * Separated network server for gRPC transport OzoneManagerService s3g->OM.
@@ -61,6 +74,10 @@ public class GrpcOzoneManagerServer {
   private int port;
   private final int maxSize;
 
+  private ThreadPoolExecutor readExecutors;
+  private EventLoopGroup bossEventLoopGroup;
+  private EventLoopGroup workerEventLoopGroup;
+
   public GrpcOzoneManagerServer(OzoneConfiguration config,
                                 OzoneManagerProtocolServerSideTranslatorPB
                                     omTranslator,
@@ -95,8 +112,41 @@ public class GrpcOzoneManagerServer {
                    OzoneDelegationTokenSecretManager delegationTokenMgr,
                    OzoneConfiguration omServerConfig,
                    CertificateClient caClient) {
+
+    int poolSize = omServerConfig.getInt(OZONE_OM_GRPC_READ_THREAD_NUM_KEY,
+        OZONE_OM_GRPC_READ_THREAD_NUM_DEFAULT);
+
+    int bossGroupSize = omServerConfig.getInt(OZONE_OM_GRPC_BOSSGROUP_SIZE_KEY,
+        OZONE_OM_GRPC_BOSSGROUP_SIZE_DEFAULT);
+
+    int workerGroupSize =
+        omServerConfig.getInt(OZONE_OM_GRPC_WORKERGROUP_SIZE_KEY,
+            OZONE_OM_GRPC_WORKERGROUP_SIZE_DEFAULT);
+
+    readExecutors = new ThreadPoolExecutor(poolSize, poolSize,
+        60, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<>(),
+        new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat("OmRpcReader-%d")
+            .build());
+
+    ThreadFactory bossFactory = new ThreadFactoryBuilder().setDaemon(true)
+        .setNameFormat("OmRpcBoss-ELG-%d")
+        .build();
+    bossEventLoopGroup = new NioEventLoopGroup(bossGroupSize, bossFactory);
+
+    ThreadFactory workerFactory = new ThreadFactoryBuilder().setDaemon(true)
+        .setNameFormat("OmRpcWorker-ELG-%d")
+        .build();
+    workerEventLoopGroup =
+        new NioEventLoopGroup(workerGroupSize, workerFactory);
+
     NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port)
         .maxInboundMessageSize(maxSize)
+        .bossEventLoopGroup(bossEventLoopGroup)
+        .workerEventLoopGroup(workerEventLoopGroup)
+        .channelType(NioServerSocketChannel.class)
+        .executor(readExecutors)
         .addService(ServerInterceptors.intercept(
             new OzoneManagerServiceGrpc(omTranslator,
                 delegationTokenMgr,
@@ -134,7 +184,11 @@ public class GrpcOzoneManagerServer {
 
   public void stop() {
     try {
+      readExecutors.shutdown();
+      readExecutors.awaitTermination(5L, TimeUnit.SECONDS);
       server.shutdown().awaitTermination(10L, TimeUnit.SECONDS);
+      bossEventLoopGroup.shutdownGracefully().sync();
+      workerEventLoopGroup.shutdownGracefully().sync();
       LOG.info("Server {} is shutdown", getClass().getSimpleName());
     } catch (InterruptedException ex) {
       LOG.warn("{} couldn't be stopped gracefully", 
getClass().getSimpleName());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to