This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 58cc1dca7f5 [improve](fe) Support to config max msg/frame size of the
thrift server (#36594)
58cc1dca7f5 is described below
commit 58cc1dca7f562c20536b191331ef2c38afe8debf
Author: walter <[email protected]>
AuthorDate: Fri Jun 21 00:15:15 2024 +0800
[improve](fe) Support to config max msg/frame size of the thrift server
(#36594)
Cherry-pick #35845
---
.../main/java/org/apache/doris/common/Config.java | 10 ++++
.../java/org/apache/doris/common/ThriftServer.java | 61 ++++++++++++++++++----
2 files changed, 60 insertions(+), 11 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index f8ff3cd5d47..6adf03c56cd 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -405,6 +405,16 @@ public class Config extends ConfigBase {
"The connection timeout of thrift client, in milliseconds. 0 means
no timeout."})
public static int thrift_client_timeout_ms = 0;
+ // The default value is inherited from org.apache.thrift.TConfiguration
+ @ConfField(description = {"thrift server 接收请求大小的上限",
+ "The maximum size of a (received) message of the thrift server, in
bytes"})
+ public static int thrift_max_message_size = 100 * 1024 * 1024;
+
+ // The default value is inherited from org.apache.thrift.TConfiguration
+ @ConfField(description = {"thrift server transport 接收的每帧数据大小的上限",
+ "The limits of the size of one frame of thrift server transport"})
+ public static int thrift_max_frame_size = 16384000;
+
@ConfField(description = {"thrift server 的 backlog 数量。"
+ "如果调大这个值,则需同时调整 /proc/sys/net/core/somaxconn 的值",
"The backlog number of thrift server. "
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServer.java
b/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServer.java
index 2396dc95074..f18dbb378a1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServer.java
@@ -23,6 +23,7 @@ import org.apache.doris.thrift.TNetworkAddress;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TConfiguration;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
@@ -31,10 +32,13 @@ import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
@@ -98,8 +102,9 @@ public class ThriftServer {
private void createThreadedServer() throws TTransportException {
TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(
- new TNonblockingServerSocket(port,
Config.thrift_client_timeout_ms)).protocolFactory(
- new TBinaryProtocol.Factory()).processor(processor);
+ new TNonblockingServerSocket(port,
Config.thrift_client_timeout_ms))
+ .protocolFactory(new TBinaryProtocol.Factory())
+ .processor(processor);
ThreadPoolExecutor threadPoolExecutor =
ThreadPoolManager.newDaemonCacheThreadPool(
Config.thrift_server_max_worker_threads, "thrift-server-pool",
true);
args.executorService(threadPoolExecutor);
@@ -111,19 +116,19 @@ public class ThriftServer {
if (FrontendOptions.isBindIPV6()) {
socketTransportArgs = new TServerSocket.ServerSocketTransportArgs()
- .bindAddr(new InetSocketAddress("::0", port))
- .clientTimeout(Config.thrift_client_timeout_ms)
- .backlog(Config.thrift_backlog_num);
+ .bindAddr(new InetSocketAddress("::0", port))
+ .clientTimeout(Config.thrift_client_timeout_ms)
+ .backlog(Config.thrift_backlog_num);
} else {
socketTransportArgs = new TServerSocket.ServerSocketTransportArgs()
- .bindAddr(new InetSocketAddress("0.0.0.0", port))
- .clientTimeout(Config.thrift_client_timeout_ms)
- .backlog(Config.thrift_backlog_num);
+ .bindAddr(new InetSocketAddress("0.0.0.0", port))
+ .clientTimeout(Config.thrift_client_timeout_ms)
+ .backlog(Config.thrift_backlog_num);
}
- TThreadPoolServer.Args serverArgs =
- new TThreadPoolServer.Args(new
TServerSocket(socketTransportArgs)).protocolFactory(
- new TBinaryProtocol.Factory()).processor(processor);
+ TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(new
ImprovedTServerSocket(socketTransportArgs))
+ .protocolFactory(new TBinaryProtocol.Factory())
+ .processor(processor);
ThreadPoolExecutor threadPoolExecutor =
ThreadPoolManager.newDaemonCacheThreadPool(
Config.thrift_server_max_worker_threads, "thrift-server-pool",
true);
serverArgs.executorService(threadPoolExecutor);
@@ -175,4 +180,38 @@ public class ThriftServer {
public void removeConnect(TNetworkAddress clientAddress) {
connects.remove(clientAddress);
}
+
+ static class ImprovedTServerSocket extends TServerSocket {
+ public ImprovedTServerSocket(ServerSocketTransportArgs args) throws
TTransportException {
+ super(args);
+ }
+
+ public TSocket accept() throws TTransportException {
+ ServerSocket serverSocket = getServerSocket();
+ if (serverSocket == null) {
+ throw new TTransportException(TTransportException.NOT_OPEN,
"No underlying server socket.");
+ }
+
+ Socket result;
+ try {
+ result = serverSocket.accept();
+ } catch (Exception e) {
+ throw new TTransportException(e);
+ }
+ if (result == null) {
+ throw new TTransportException("Blocking server's accept() may
not return NULL");
+ }
+
+ TSocket socket = new TSocket(result);
+
+ TConfiguration cfg = socket.getConfiguration();
+ cfg.setMaxMessageSize(Config.thrift_max_message_size);
+ cfg.setMaxFrameSize(Config.thrift_max_frame_size);
+
+ socket.updateKnownMessageSize(0); // Since we update the
configuration, reset consumed message size.
+ socket.setTimeout(Config.thrift_client_timeout_ms);
+
+ return socket;
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]