This is an automated email from the ASF dual-hosted git repository.
lide pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new 9dc67a2f412 [improvement](grace stop)BE grace stop improvement (#27690)
9dc67a2f412 is described below
commit 9dc67a2f41293c0d51471e42d4ade5b222193b3a
Author: huanghg1994 <[email protected]>
AuthorDate: Thu Jul 18 10:14:13 2024 +0800
[improvement](grace stop)BE grace stop improvement (#27690)
---
be/src/common/config.h | 4 ++++
be/src/runtime/exec_env_init.cpp | 2 +-
be/src/runtime/thread_context.h | 4 +++-
be/src/service/doris_main.cpp | 20 ++++++++++++++++++++
4 files changed, 28 insertions(+), 2 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 660865c36e1..507c369e996 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -932,6 +932,10 @@ CONF_Int32(primary_key_data_page_size, "32768");
// the max package size be thrift server can receive,avoid accepting error or
too large package causing OOM,default 20M
CONF_Int32(be_thrift_max_pkg_bytes, "20000000");
+// grace stop time limit, exit() will be called if be dose not stop within
this time limit
+// default value is 0, which means there is no time limit and be will wait
until it is successfully stopped
+CONF_Int32(grace_shutdown_wait_seconds, "0");
+
#ifdef BE_TEST
// test s3
CONF_String(test_s3_resource, "resource");
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index f91233dae04..f078e236398 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -365,6 +365,7 @@ void ExecEnv::_destroy() {
_deregister_metrics();
SAFE_DELETE(_internal_client_cache);
SAFE_DELETE(_function_client_cache);
+ SAFE_DELETE(_routine_load_task_executor);
SAFE_DELETE(_load_stream_mgr);
SAFE_DELETE(_load_channel_mgr);
SAFE_DELETE(_broker_mgr);
@@ -385,7 +386,6 @@ void ExecEnv::_destroy() {
SAFE_DELETE(_result_queue_mgr);
SAFE_DELETE(_stream_mgr);
SAFE_DELETE(_stream_load_executor);
- SAFE_DELETE(_routine_load_task_executor);
SAFE_DELETE(_external_scan_context_mgr);
SAFE_DELETE(_heartbeat_flags);
SAFE_DELETE(_scanner_scheduler);
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 0a4a676bb8e..d8e20b2b0fe 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -218,7 +218,9 @@ public:
// The brpc server should respond as quickly as possible.
bthread_context->thread_mem_tracker_mgr->disable_wait_gc();
// set the data so that next time bthread_getspecific in the
thread returns the data.
- CHECK((0 == bthread_setspecific(btls_key, bthread_context)) ||
doris::k_doris_exit);
+ if (!doris::k_doris_exit) {
+ CHECK(0 == bthread_setspecific(btls_key, bthread_context));
+ }
thread_context_ptr.init = true;
}
bthread_id = bthread_self();
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index a02d290e0d6..75f3414dae5 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -60,6 +60,7 @@
#include "util/doris_metrics.h"
#include "util/perf_counters.h"
#include "util/telemetry/telemetry.h"
+#include "util/thread.h"
#include "util/thrift_rpc_helper.h"
#include "util/thrift_server.h"
#include "util/uid_util.h"
@@ -250,6 +251,15 @@ void check_required_instructions() {
}
}
+void forceShutdown() {
+ if (doris::config::grace_shutdown_wait_seconds > 0) {
+ sleep(doris::config::grace_shutdown_wait_seconds);
+ LOG(WARNING) << "Doris BE grace stop time over "
+ << doris::config::grace_shutdown_wait_seconds << ", force
shutdown!";
+ exit(-1);
+ }
+}
+
struct Checker {
Checker() { check_required_instructions(); }
} checker
@@ -500,6 +510,16 @@ int main(int argc, char** argv) {
sleep(10);
}
+ scoped_refptr<doris::Thread> force_shutdown_thread;
+ if (doris::config::grace_shutdown_wait_seconds > 0) {
+ LOG(INFO) << "If Doris BE can't be closed successfully within "
+ << doris::config::grace_shutdown_wait_seconds
+ << " seconds, it will be force shutdown";
+ doris::Thread::create(
+ "doris_main", "force_shutdown_thread", []() { forceShutdown();
},
+ &force_shutdown_thread);
+ }
+
http_service.stop();
brpc_service.join();
if (doris::config::enable_single_replica_load) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]