This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 950e667c5ff [chore](be) Support config max message size for be thrift
server #36467 (#36591)
950e667c5ff is described below
commit 950e667c5ff07a0da7d7307a8ea6aa3a184cd181
Author: walter <[email protected]>
AuthorDate: Thu Jun 20 17:54:49 2024 +0800
[chore](be) Support config max message size for be thrift server #36467
(#36591)
---
be/src/common/config.cpp | 3 ++
be/src/common/config.h | 3 ++
be/src/runtime/snapshot_loader.cpp | 2 +-
be/src/util/thrift_server.cpp | 66 +++++++++++++++++++++++++-------------
4 files changed, 51 insertions(+), 23 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 34181f4d256..604535825fb 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -240,6 +240,9 @@ DEFINE_mInt32(thrift_connect_timeout_seconds, "3");
DEFINE_mInt32(fetch_rpc_timeout_seconds, "30");
// default thrift client retry interval (in milliseconds)
DEFINE_mInt64(thrift_client_retry_interval_ms, "1000");
+// max message size of thrift request
+// default: 100 * 1024 * 1024
+DEFINE_mInt64(thrift_max_message_size, "104857600");
// max row count number for single scan range, used in segmentv1
DEFINE_mInt32(doris_scan_range_row_count, "524288");
// max bytes number for single scan range, used in segmentv2
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 7665b4866dd..6e7f2ff490a 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -285,6 +285,9 @@ DECLARE_mInt32(thrift_connect_timeout_seconds);
DECLARE_mInt32(fetch_rpc_timeout_seconds);
// default thrift client retry interval (in milliseconds)
DECLARE_mInt64(thrift_client_retry_interval_ms);
+// max message size of thrift request
+// default: 100 * 1024 * 1024
+DECLARE_mInt64(thrift_max_message_size);
// max row count number for single scan range, used in segmentv1
DECLARE_mInt32(doris_scan_range_row_count);
// max bytes number for single scan range, used in segmentv2
diff --git a/be/src/runtime/snapshot_loader.cpp
b/be/src/runtime/snapshot_loader.cpp
index da22a7c9167..7c2c68de3dd 100644
--- a/be/src/runtime/snapshot_loader.cpp
+++ b/be/src/runtime/snapshot_loader.cpp
@@ -93,7 +93,7 @@ Status SnapshotLoader::init(TStorageBackendType::type type,
const std::string& l
RETURN_IF_ERROR(io::BrokerFileSystem::create(_broker_addr, _prop,
&fs));
_remote_fs = std::move(fs);
} else {
- return Status::InternalError("Unknown storage tpye: {}", type);
+ return Status::InternalError("Unknown storage type: {}", type);
}
return Status::OK();
}
diff --git a/be/src/util/thrift_server.cpp b/be/src/util/thrift_server.cpp
index 3bd25ab61f3..2d753c58918 100644
--- a/be/src/util/thrift_server.cpp
+++ b/be/src/util/thrift_server.cpp
@@ -34,6 +34,7 @@
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <condition_variable>
+#include <memory>
#include <mutex>
#include <sstream>
#include <thread>
@@ -59,6 +60,28 @@
DEFINE_GAUGE_METRIC_PROTOTYPE_3ARG(thrift_current_connections, MetricUnit::CONNE
DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(thrift_connections_total,
MetricUnit::CONNECTIONS,
"Total connections made over the lifetime
of this server");
+// Nonblocking Server socket implementation of TNonblockingServerTransport.
Wrapper around a unix
+// socket listen and accept calls.
+class ImprovedNonblockingServerSocket : public
apache::thrift::transport::TNonblockingServerSocket {
+ using TConfiguration = apache::thrift::TConfiguration;
+ using TSocket = apache::thrift::transport::TSocket;
+
+public:
+ // Constructor.
+ ImprovedNonblockingServerSocket(int port)
+ : TNonblockingServerSocket(port),
+
config(std::make_shared<TConfiguration>(config::thrift_max_message_size)) {}
+ ~ImprovedNonblockingServerSocket() override = default;
+
+protected:
+ std::shared_ptr<TSocket> createSocket(THRIFT_SOCKET clientSocket) override
{
+ return std::make_shared<TSocket>(clientSocket, config);
+ }
+
+private:
+ std::shared_ptr<TConfiguration> config;
+};
+
// Helper class that starts a server in a separate thread, and handles
// the inter-thread communication to monitor whether it started
// correctly.
@@ -69,26 +92,26 @@ public:
: _thrift_server(thrift_server), _signal_fired(false) {}
// friendly to code style
- virtual ~ThriftServerEventProcessor() {}
+ ~ThriftServerEventProcessor() override = default;
// Called by TNonBlockingServer when server has acquired its resources and
is ready to
// serve, and signals to StartAndWaitForServer that start-up is finished.
// From TServerEventHandler.
- virtual void preServe();
+ void preServe() override;
// Called when a client connects; we create per-client state and call any
// SessionHandlerIf handler.
- virtual void*
createContext(std::shared_ptr<apache::thrift::protocol::TProtocol> input,
-
std::shared_ptr<apache::thrift::protocol::TProtocol> output);
+ void* createContext(std::shared_ptr<apache::thrift::protocol::TProtocol>
input,
+ std::shared_ptr<apache::thrift::protocol::TProtocol>
output) override;
// Called when a client starts an RPC; we set the thread-local session key.
- virtual void processContext(void* context,
-
std::shared_ptr<apache::thrift::transport::TTransport> output);
+ void processContext(void* context,
+ std::shared_ptr<apache::thrift::transport::TTransport>
output) override;
// Called when a client disconnects; we call any SessionHandlerIf handler.
- virtual void deleteContext(void* serverContext,
-
std::shared_ptr<apache::thrift::protocol::TProtocol> input,
-
std::shared_ptr<apache::thrift::protocol::TProtocol> output);
+ void deleteContext(void* serverContext,
+ std::shared_ptr<apache::thrift::protocol::TProtocol>
input,
+ std::shared_ptr<apache::thrift::protocol::TProtocol>
output) override;
// Waits for a timeout of TIMEOUT_MS for a server to signal that it has
started
// correctly.
@@ -126,8 +149,8 @@ Status
ThriftServer::ThriftServerEventProcessor::start_and_wait_for_server() {
std::unique_lock<std::mutex> lock(_signal_lock);
_thrift_server->_started = false;
- _thrift_server->_server_thread.reset(
- new
std::thread(&ThriftServer::ThriftServerEventProcessor::supervise, this));
+ _thrift_server->_server_thread = std::make_unique<std::thread>(
+ &ThriftServer::ThriftServerEventProcessor::supervise, this);
// Loop protects against spurious wakeup. Locks provide necessary fences
to ensure
// visibility.
@@ -315,7 +338,7 @@ Status ThriftServer::start() {
std::shared_ptr<apache::thrift::transport::TServerTransport>
fe_server_transport;
std::shared_ptr<apache::thrift::transport::TTransportFactory>
transport_factory;
std::shared_ptr<apache::thrift::transport::TNonblockingServerSocket>
socket =
-
std::make_shared<apache::thrift::transport::TNonblockingServerSocket>(_port);
+ std::make_shared<ImprovedNonblockingServerSocket>(_port);
if (_server_type != THREADED) {
thread_mgr =
apache::thrift::concurrency::ThreadManager::newSimpleThreadManager(
_num_worker_threads);
@@ -329,40 +352,39 @@ Status ThriftServer::start() {
switch (_server_type) {
case NON_BLOCKING:
- if (transport_factory.get() == nullptr) {
+ if (transport_factory == nullptr) {
transport_factory.reset(new
apache::thrift::transport::TTransportFactory());
}
- _server.reset(new apache::thrift::server::TNonblockingServer(
+ _server = std::make_unique<apache::thrift::server::TNonblockingServer>(
_processor, transport_factory, transport_factory,
protocol_factory,
- protocol_factory, socket, thread_mgr));
+ protocol_factory, socket, thread_mgr);
break;
case THREAD_POOL:
fe_server_transport.reset(new apache::thrift::transport::TServerSocket(
BackendOptions::get_service_bind_address_without_bracket(),
_port));
- if (transport_factory.get() == nullptr) {
+ if (transport_factory == nullptr) {
transport_factory.reset(new
apache::thrift::transport::TBufferedTransportFactory());
}
- _server.reset(new apache::thrift::server::TThreadPoolServer(
- _processor, fe_server_transport, transport_factory,
protocol_factory, thread_mgr));
+ _server = std::make_unique<apache::thrift::server::TThreadPoolServer>(
+ _processor, fe_server_transport, transport_factory,
protocol_factory, thread_mgr);
break;
case THREADED:
server_socket = new apache::thrift::transport::TServerSocket(
BackendOptions::get_service_bind_address_without_bracket(),
_port);
- // server_socket->setAcceptTimeout(500);
fe_server_transport.reset(server_socket);
- if (transport_factory.get() == nullptr) {
+ if (transport_factory == nullptr) {
transport_factory.reset(new
apache::thrift::transport::TBufferedTransportFactory());
}
- _server.reset(new apache::thrift::server::TThreadedServer(
+ _server = std::make_unique<apache::thrift::server::TThreadedServer>(
_processor, fe_server_transport, transport_factory,
protocol_factory,
- thread_factory));
+ thread_factory);
break;
default:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]