This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 7d7eb73e03 [ISSUE #10003] Add gRPC maxConcurrentCallsPerConnection
Configuration to Proxy (#10004)
7d7eb73e03 is described below
commit 7d7eb73e035a7a172983396f018ce1726464528b
Author: qianye <[email protected]>
AuthorDate: Mon Mar 30 14:27:01 2026 +0800
[ISSUE #10003] Add gRPC maxConcurrentCallsPerConnection Configuration to
Proxy (#10004)
---
.../rocketmq/common/thread/ThreadPoolMonitor.java | 7 ++++---
.../apache/rocketmq/proxy/config/ProxyConfig.java | 19 +++++++++++++++++++
.../rocketmq/proxy/grpc/GrpcServerBuilder.java | 20 +++++++++++---------
.../proxy/remoting/RemotingProtocolServer.java | 13 ++++++-------
proxy/src/main/resources/rmq.proxy.logback.xml | 2 +-
5 files changed, 41 insertions(+), 20 deletions(-)
diff --git
a/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java
b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java
index a79674568b..02acd78ba1 100644
---
a/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java
+++
b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java
@@ -142,9 +142,10 @@ public class ThreadPoolMonitor {
List<ThreadPoolStatusMonitor> monitors =
threadPoolWrapper.getStatusPrinters();
for (ThreadPoolStatusMonitor monitor : monitors) {
double value =
monitor.value(threadPoolWrapper.getThreadPoolExecutor());
- String nameFormatted = String.format("%-40s",
threadPoolWrapper.getName());
- String descFormatted = String.format("%-12s",
monitor.describe());
- waterMarkLogger.info("{}{}{}", nameFormatted, descFormatted,
value);
+ waterMarkLogger.info("\t{}\t{}\t{}",
threadPoolWrapper.getName(),
+ monitor.describe(),
+ value);
+
if (enablePrintJstack) {
if
(monitor.needPrintJstack(threadPoolWrapper.getThreadPoolExecutor(), value) &&
System.currentTimeMillis() - jstackTime >
jstackPeriodTime) {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index d44b82aff5..5a1a585930 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -95,6 +95,17 @@ public class ProxyConfig implements ConfigFile {
private boolean enableGrpcEpoll = false;
private int grpcThreadPoolNums = 16 + PROCESSOR_NUMBER * 2;
private int grpcThreadPoolQueueCapacity = 100000;
+
+ /**
+ * Maximum number of concurrent gRPC calls allowed per client connection.
+ * <p>
+ * A single client issuing excessively high concurrent requests may skew
the validation load balancing
+ * and overload a single proxy instance (hotspot), potentially bringing it
down. Limiting
+ * {@code grpcMaxConcurrentCallsPerConnection} helps mitigate this
per-connection hotspot risk.
+ * <p>
+ * Note: Setting this limit too low may cause send/consume failures (e.g.,
backpressure or rejected calls).
+ */
+ private int grpcMaxConcurrentCallsPerConnection = Integer.MAX_VALUE;
private String brokerConfigPath = ConfigurationManager.getProxyHome() +
"/conf/broker.conf";
/**
* gRPC max message size
@@ -1581,4 +1592,12 @@ public class ProxyConfig implements ConfigFile {
public void setReturnHandleGroupThreadPoolNums(int
returnHandleGroupThreadPoolNums) {
this.returnHandleGroupThreadPoolNums = returnHandleGroupThreadPoolNums;
}
+
+ public int getGrpcMaxConcurrentCallsPerConnection() {
+ return grpcMaxConcurrentCallsPerConnection;
+ }
+
+ public void setGrpcMaxConcurrentCallsPerConnection(int
grpcMaxConcurrentCallsPerConnection) {
+ this.grpcMaxConcurrentCallsPerConnection =
grpcMaxConcurrentCallsPerConnection;
+ }
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
index 163e799f41..1f012e6f40 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
@@ -24,16 +24,16 @@ import
io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoopGroup;
import io.grpc.netty.shaded.io.netty.channel.epoll.EpollServerSocketChannel;
import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup;
import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioServerSocketChannel;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.grpc.interceptor.ContextInterceptor;
import org.apache.rocketmq.proxy.grpc.interceptor.GlobalExceptionInterceptor;
import org.apache.rocketmq.proxy.grpc.interceptor.HeaderInterceptor;
-
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.proxy.service.cert.TlsCertificateManager;
public class GrpcServerBuilder {
@@ -52,18 +52,20 @@ public class GrpcServerBuilder {
}
protected GrpcServerBuilder(ThreadPoolExecutor executor, int port,
TlsCertificateManager tlsCertificateManager) {
+ ProxyConfig config = ConfigurationManager.getProxyConfig();
this.tlsCertificateManager = tlsCertificateManager;
- serverBuilder = NettyServerBuilder.forPort(port);
+ serverBuilder = NettyServerBuilder.forPort(port)
+
.maxConcurrentCallsPerConnection(config.getGrpcMaxConcurrentCallsPerConnection());
serverBuilder.protocolNegotiator(new ProxyAndTlsProtocolNegotiator());
// build server
- int bossLoopNum =
ConfigurationManager.getProxyConfig().getGrpcBossLoopNum();
- int workerLoopNum =
ConfigurationManager.getProxyConfig().getGrpcWorkerLoopNum();
- int maxInboundMessageSize =
ConfigurationManager.getProxyConfig().getGrpcMaxInboundMessageSize();
- long idleTimeMills =
ConfigurationManager.getProxyConfig().getGrpcClientIdleTimeMills();
+ int bossLoopNum = config.getGrpcBossLoopNum();
+ int workerLoopNum = config.getGrpcWorkerLoopNum();
+ int maxInboundMessageSize = config.getGrpcMaxInboundMessageSize();
+ long idleTimeMills = config.getGrpcClientIdleTimeMills();
- if (ConfigurationManager.getProxyConfig().isEnableGrpcEpoll()) {
+ if (config.isEnableGrpcEpoll()) {
serverBuilder.bossEventLoopGroup(new
EpollEventLoopGroup(bossLoopNum))
.workerEventLoopGroup(new EpollEventLoopGroup(workerLoopNum))
.channelType(EpollServerSocketChannel.class)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
index a01c23fce6..c26f6bc2ef 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
@@ -19,6 +19,11 @@ package org.apache.rocketmq.proxy.remoting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.channel.Channel;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.auth.config.AuthConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.future.FutureTaskExt;
@@ -59,12 +64,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
public class RemotingProtocolServer implements StartAndShutdown,
RemotingProxyOutClient {
private final static Logger log =
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
@@ -194,7 +193,7 @@ public class RemotingProtocolServer implements
StartAndShutdown, RemotingProxyOu
this.timerExecutor = ThreadUtils.newSingleThreadScheduledExecutor(
new
ThreadFactoryBuilder().setNameFormat("RemotingServerScheduler-%d").build()
);
- this.timerExecutor.scheduleAtFixedRate(this::cleanExpireRequest, 10,
10, TimeUnit.SECONDS);
+ this.timerExecutor.scheduleAtFixedRate(this::cleanExpireRequest, 100,
100, TimeUnit.MILLISECONDS);
this.registerRemotingServer(this.defaultRemotingServer);
}
diff --git a/proxy/src/main/resources/rmq.proxy.logback.xml
b/proxy/src/main/resources/rmq.proxy.logback.xml
index aee4cbc71b..3eccf5f023 100644
--- a/proxy/src/main/resources/rmq.proxy.logback.xml
+++ b/proxy/src/main/resources/rmq.proxy.logback.xml
@@ -52,7 +52,7 @@
<maxFileSize>128MB</maxFileSize>
</triggeringPolicy>
<encoder>
- <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %m%n</pattern>
+ <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8}%m%n</pattern>
<charset class="java.nio.charset.Charset">UTF-8</charset>
</encoder>
</appender>