This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new e77d52386cb branch-4.0: [Bug](udf) fix the _udf_close_workers thread
pool not stop when exit #57918 (#57956)
e77d52386cb is described below
commit e77d52386cb6a5284da8f3b5998a98dac7b33905
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Nov 13 10:02:55 2025 +0800
branch-4.0: [Bug](udf) fix the _udf_close_workers thread pool not stop when
exit #57918 (#57956)
Cherry-picked from #57918
Co-authored-by: zhangstar333 <[email protected]>
---
be/src/runtime/exec_env.h | 3 +++
be/src/runtime/exec_env_init.cpp | 7 +++++++
be/src/vec/functions/function_java_udf.cpp | 18 ++++--------------
be/src/vec/functions/function_java_udf.h | 3 ---
4 files changed, 14 insertions(+), 17 deletions(-)
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 66b32648902..a7a57a86d32 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -244,6 +244,7 @@ public:
ThreadPool* lazy_release_obj_pool() { return _lazy_release_obj_pool.get();
}
ThreadPool* non_block_close_thread_pool();
ThreadPool* s3_file_system_thread_pool() { return
_s3_file_system_thread_pool.get(); }
+ ThreadPool* udf_close_workers_pool() { return
_udf_close_workers_thread_pool.get(); }
void init_file_cache_factory(std::vector<doris::CachePath>& cache_paths);
io::FileCacheFactory* file_cache_factory() { return _file_cache_factory; }
@@ -459,6 +460,8 @@ private:
std::unique_ptr<ThreadPool> _lazy_release_obj_pool;
std::unique_ptr<ThreadPool> _non_block_close_thread_pool;
std::unique_ptr<ThreadPool> _s3_file_system_thread_pool;
+ // for java-udf to close
+ std::unique_ptr<ThreadPool> _udf_close_workers_thread_pool;
FragmentMgr* _fragment_mgr = nullptr;
WorkloadGroupMgr* _workload_group_manager = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 78bdfa34f7e..b21d63280fd 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -242,6 +242,11 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
.set_max_queue_size(config::send_batch_thread_pool_queue_size)
.build(&_send_batch_thread_pool));
+ static_cast<void>(ThreadPoolBuilder("UDFCloseWorkers")
+ .set_min_threads(4)
+ .set_max_threads(std::min(32,
CpuInfo::num_cores()))
+ .build(&_udf_close_workers_thread_pool));
+
auto [buffered_reader_min_threads, buffered_reader_max_threads] =
get_num_threads(config::num_buffered_reader_prefetch_thread_pool_min_thread,
config::num_buffered_reader_prefetch_thread_pool_max_thread);
@@ -789,6 +794,7 @@ void ExecEnv::destroy() {
SAFE_SHUTDOWN(_non_block_close_thread_pool);
SAFE_SHUTDOWN(_s3_file_system_thread_pool);
SAFE_SHUTDOWN(_send_batch_thread_pool);
+ SAFE_SHUTDOWN(_udf_close_workers_thread_pool);
SAFE_SHUTDOWN(_send_table_stats_thread_pool);
SAFE_DELETE(_load_channel_mgr);
@@ -844,6 +850,7 @@ void ExecEnv::destroy() {
_buffered_reader_prefetch_thread_pool.reset(nullptr);
_s3_file_upload_thread_pool.reset(nullptr);
_send_batch_thread_pool.reset(nullptr);
+ _udf_close_workers_thread_pool.reset(nullptr);
_write_cooldown_meta_executors.reset(nullptr);
SAFE_DELETE(_broker_client_cache);
diff --git a/be/src/vec/functions/function_java_udf.cpp
b/be/src/vec/functions/function_java_udf.cpp
index ca024611ea2..7b4163e5c48 100644
--- a/be/src/vec/functions/function_java_udf.cpp
+++ b/be/src/vec/functions/function_java_udf.cpp
@@ -24,6 +24,7 @@
#include "common/exception.h"
#include "jni.h"
+#include "runtime/exec_env.h"
#include "runtime/user_function_cache.h"
#include "util/jni-util.h"
#include "vec/core/block.h"
@@ -35,22 +36,10 @@ const char* EXECUTOR_EVALUATE_SIGNATURE =
"(Ljava/util/Map;Ljava/util/Map;)J";
const char* EXECUTOR_CLOSE_SIGNATURE = "()V";
namespace doris::vectorized {
-std::unique_ptr<ThreadPool> JavaFunctionCall::close_workers;
-std::once_flag JavaFunctionCall::close_workers_init_once;
JavaFunctionCall::JavaFunctionCall(const TFunction& fn, const DataTypes&
argument_types,
const DataTypePtr& return_type)
- : fn_(fn), _argument_types(argument_types), _return_type(return_type) {
- std::call_once(close_workers_init_once, []() {
- auto build_st = ThreadPoolBuilder("UDFCloseWorkers")
- .set_min_threads(4)
- .set_max_threads(std::min(32,
CpuInfo::num_cores()))
- .build(&close_workers);
- if (!build_st.ok()) {
- throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Failed to build
UDFCloseWorkers");
- }
- });
-}
+ : fn_(fn), _argument_types(argument_types), _return_type(return_type)
{}
Status JavaFunctionCall::open(FunctionContext* context,
FunctionContext::FunctionStateScope scope) {
JNIEnv* env = nullptr;
@@ -156,7 +145,8 @@ Status JavaFunctionCall::close(FunctionContext* context,
// Use the close_workers pthread pool to execute the close function
auto task =
std::make_shared<std::packaged_task<Status()>>(std::move(close_func));
auto task_future = task->get_future();
- RETURN_IF_ERROR(close_workers->submit_func([task]() { (*task)(); }));
+
RETURN_IF_ERROR(ExecEnv::GetInstance()->udf_close_workers_pool()->submit_func(
+ [task]() { (*task)(); }));
RETURN_IF_ERROR(task_future.get());
return Status::OK();
}
diff --git a/be/src/vec/functions/function_java_udf.h
b/be/src/vec/functions/function_java_udf.h
index 74e53aec2ed..f0d41e73740 100644
--- a/be/src/vec/functions/function_java_udf.h
+++ b/be/src/vec/functions/function_java_udf.h
@@ -110,9 +110,6 @@ private:
const DataTypes _argument_types;
const DataTypePtr _return_type;
- static std::unique_ptr<ThreadPool> close_workers;
- static std::once_flag close_workers_init_once;
-
struct JniContext {
// Do not save parent directly, because parent is in VExpr, but jni
context is in FunctionContext
// The deconstruct sequence is not determined, it will core.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]