This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3fd43353fd [ISSUE #7393] Add timeout configuration for grpc server 
(#7394)
3fd43353fd is described below

commit 3fd43353fdf880deb5d63ba3ad50cc6e3259dc3a
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Tue Sep 26 13:53:51 2023 +0800

    [ISSUE #7393] Add timeout configuration for grpc server (#7394)
    
    * Add timeout configuration for grpc server
    
    * Add proxyConfig
---
 .../main/java/org/apache/rocketmq/proxy/ProxyStartup.java    |  1 +
 .../java/org/apache/rocketmq/proxy/config/ProxyConfig.java   |  9 +++++++++
 .../main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java | 10 ++++++++--
 .../org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java    | 12 +++++++++++-
 4 files changed, 29 insertions(+), 3 deletions(-)

diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
index 06d5f4525f..3b2ca99bfd 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
@@ -85,6 +85,7 @@ public class ProxyStartup {
                 .addService(ChannelzService.newInstance(100))
                 .addService(ProtoReflectionService.newInstance())
                 .configInterceptor(accessValidators)
+                
.shutdownTime(ConfigurationManager.getProxyConfig().getGrpcShutdownTimeSeconds(),
 TimeUnit.SECONDS)
                 .build();
             PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(grpcServer);
 
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index b2478fec3a..c0d00d8640 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -87,6 +87,7 @@ public class ProxyConfig implements ConfigFile {
      */
     private String proxyMode = ProxyMode.CLUSTER.name();
     private Integer grpcServerPort = 8081;
+    private long grpcShutdownTimeSeconds = 30;
     private int grpcBossLoopNum = 1;
     private int grpcWorkerLoopNum = PROCESSOR_NUMBER * 2;
     private boolean enableGrpcEpoll = false;
@@ -443,6 +444,14 @@ public class ProxyConfig implements ConfigFile {
         this.grpcServerPort = grpcServerPort;
     }
 
+    public long getGrpcShutdownTimeSeconds() {
+        return grpcShutdownTimeSeconds;
+    }
+
+    public void setGrpcShutdownTimeSeconds(long grpcShutdownTimeSeconds) {
+        this.grpcShutdownTimeSeconds = grpcShutdownTimeSeconds;
+    }
+
     public boolean isUseEndpointPortFromRequest() {
         return useEndpointPortFromRequest;
     }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java
index 1bffa3c0be..d5b896fe14 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java
@@ -29,8 +29,14 @@ public class GrpcServer implements StartAndShutdown {
 
     private final Server server;
 
-    protected GrpcServer(Server server) {
+    private final long timeout;
+
+    private final TimeUnit unit;
+
+    protected GrpcServer(Server server, long timeout, TimeUnit unit) {
         this.server = server;
+        this.timeout = timeout;
+        this.unit = unit;
     }
 
     public void start() throws Exception {
@@ -40,7 +46,7 @@ public class GrpcServer implements StartAndShutdown {
 
     public void shutdown() {
         try {
-            this.server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
+            this.server.shutdown().awaitTermination(timeout, unit);
             log.info("grpc server shutdown successfully.");
         } catch (Exception e) {
             e.printStackTrace();
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
index 9cddd30137..0e79006f6b 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
@@ -41,6 +41,10 @@ public class GrpcServerBuilder {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
     protected NettyServerBuilder serverBuilder;
 
+    protected long time = 30;
+
+    protected TimeUnit unit = TimeUnit.SECONDS;
+
     public static GrpcServerBuilder newBuilder(ThreadPoolExecutor executor, 
int port) {
         return new GrpcServerBuilder(executor, port);
     }
@@ -77,6 +81,12 @@ public class GrpcServerBuilder {
             port, bossLoopNum, workerLoopNum, maxInboundMessageSize);
     }
 
+    public GrpcServerBuilder shutdownTime(long time, TimeUnit unit) {
+        this.time = time;
+        this.unit = unit;
+        return this;
+    }
+
     public GrpcServerBuilder addService(BindableService service) {
         this.serverBuilder.addService(service);
         return this;
@@ -93,7 +103,7 @@ public class GrpcServerBuilder {
     }
 
     public GrpcServer build() {
-        return new GrpcServer(this.serverBuilder.build());
+        return new GrpcServer(this.serverBuilder.build(), time, unit);
     }
 
     public GrpcServerBuilder configInterceptor(List<AccessValidator> 
accessValidators) {

Reply via email to