This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new e77d52386cb branch-4.0: [Bug](udf) fix the _udf_close_workers thread 
pool not stop when exit #57918 (#57956)
e77d52386cb is described below

commit e77d52386cb6a5284da8f3b5998a98dac7b33905
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Nov 13 10:02:55 2025 +0800

    branch-4.0: [Bug](udf) fix the _udf_close_workers thread pool not stop when 
exit #57918 (#57956)
    
    Cherry-picked from #57918
    
    Co-authored-by: zhangstar333 <[email protected]>
---
 be/src/runtime/exec_env.h                  |  3 +++
 be/src/runtime/exec_env_init.cpp           |  7 +++++++
 be/src/vec/functions/function_java_udf.cpp | 18 ++++--------------
 be/src/vec/functions/function_java_udf.h   |  3 ---
 4 files changed, 14 insertions(+), 17 deletions(-)

diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 66b32648902..a7a57a86d32 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -244,6 +244,7 @@ public:
     ThreadPool* lazy_release_obj_pool() { return _lazy_release_obj_pool.get(); 
}
     ThreadPool* non_block_close_thread_pool();
     ThreadPool* s3_file_system_thread_pool() { return 
_s3_file_system_thread_pool.get(); }
+    ThreadPool* udf_close_workers_pool() { return 
_udf_close_workers_thread_pool.get(); }
 
     void init_file_cache_factory(std::vector<doris::CachePath>& cache_paths);
     io::FileCacheFactory* file_cache_factory() { return _file_cache_factory; }
@@ -459,6 +460,8 @@ private:
     std::unique_ptr<ThreadPool> _lazy_release_obj_pool;
     std::unique_ptr<ThreadPool> _non_block_close_thread_pool;
     std::unique_ptr<ThreadPool> _s3_file_system_thread_pool;
+    // for java-udf to close
+    std::unique_ptr<ThreadPool> _udf_close_workers_thread_pool;
 
     FragmentMgr* _fragment_mgr = nullptr;
     WorkloadGroupMgr* _workload_group_manager = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 78bdfa34f7e..b21d63280fd 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -242,6 +242,11 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
                               
.set_max_queue_size(config::send_batch_thread_pool_queue_size)
                               .build(&_send_batch_thread_pool));
 
+    static_cast<void>(ThreadPoolBuilder("UDFCloseWorkers")
+                              .set_min_threads(4)
+                              .set_max_threads(std::min(32, 
CpuInfo::num_cores()))
+                              .build(&_udf_close_workers_thread_pool));
+
     auto [buffered_reader_min_threads, buffered_reader_max_threads] =
             
get_num_threads(config::num_buffered_reader_prefetch_thread_pool_min_thread,
                             
config::num_buffered_reader_prefetch_thread_pool_max_thread);
@@ -789,6 +794,7 @@ void ExecEnv::destroy() {
     SAFE_SHUTDOWN(_non_block_close_thread_pool);
     SAFE_SHUTDOWN(_s3_file_system_thread_pool);
     SAFE_SHUTDOWN(_send_batch_thread_pool);
+    SAFE_SHUTDOWN(_udf_close_workers_thread_pool);
     SAFE_SHUTDOWN(_send_table_stats_thread_pool);
 
     SAFE_DELETE(_load_channel_mgr);
@@ -844,6 +850,7 @@ void ExecEnv::destroy() {
     _buffered_reader_prefetch_thread_pool.reset(nullptr);
     _s3_file_upload_thread_pool.reset(nullptr);
     _send_batch_thread_pool.reset(nullptr);
+    _udf_close_workers_thread_pool.reset(nullptr);
     _write_cooldown_meta_executors.reset(nullptr);
 
     SAFE_DELETE(_broker_client_cache);
diff --git a/be/src/vec/functions/function_java_udf.cpp 
b/be/src/vec/functions/function_java_udf.cpp
index ca024611ea2..7b4163e5c48 100644
--- a/be/src/vec/functions/function_java_udf.cpp
+++ b/be/src/vec/functions/function_java_udf.cpp
@@ -24,6 +24,7 @@
 
 #include "common/exception.h"
 #include "jni.h"
+#include "runtime/exec_env.h"
 #include "runtime/user_function_cache.h"
 #include "util/jni-util.h"
 #include "vec/core/block.h"
@@ -35,22 +36,10 @@ const char* EXECUTOR_EVALUATE_SIGNATURE = 
"(Ljava/util/Map;Ljava/util/Map;)J";
 const char* EXECUTOR_CLOSE_SIGNATURE = "()V";
 
 namespace doris::vectorized {
-std::unique_ptr<ThreadPool> JavaFunctionCall::close_workers;
-std::once_flag JavaFunctionCall::close_workers_init_once;
 
 JavaFunctionCall::JavaFunctionCall(const TFunction& fn, const DataTypes& 
argument_types,
                                    const DataTypePtr& return_type)
-        : fn_(fn), _argument_types(argument_types), _return_type(return_type) {
-    std::call_once(close_workers_init_once, []() {
-        auto build_st = ThreadPoolBuilder("UDFCloseWorkers")
-                                .set_min_threads(4)
-                                .set_max_threads(std::min(32, 
CpuInfo::num_cores()))
-                                .build(&close_workers);
-        if (!build_st.ok()) {
-            throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Failed to build 
UDFCloseWorkers");
-        }
-    });
-}
+        : fn_(fn), _argument_types(argument_types), _return_type(return_type) 
{}
 
 Status JavaFunctionCall::open(FunctionContext* context, 
FunctionContext::FunctionStateScope scope) {
     JNIEnv* env = nullptr;
@@ -156,7 +145,8 @@ Status JavaFunctionCall::close(FunctionContext* context,
         // Use the close_workers pthread pool to execute the close function
         auto task = 
std::make_shared<std::packaged_task<Status()>>(std::move(close_func));
         auto task_future = task->get_future();
-        RETURN_IF_ERROR(close_workers->submit_func([task]() { (*task)(); }));
+        
RETURN_IF_ERROR(ExecEnv::GetInstance()->udf_close_workers_pool()->submit_func(
+                [task]() { (*task)(); }));
         RETURN_IF_ERROR(task_future.get());
         return Status::OK();
     }
diff --git a/be/src/vec/functions/function_java_udf.h 
b/be/src/vec/functions/function_java_udf.h
index 74e53aec2ed..f0d41e73740 100644
--- a/be/src/vec/functions/function_java_udf.h
+++ b/be/src/vec/functions/function_java_udf.h
@@ -110,9 +110,6 @@ private:
     const DataTypes _argument_types;
     const DataTypePtr _return_type;
 
-    static std::unique_ptr<ThreadPool> close_workers;
-    static std::once_flag close_workers_init_once;
-
     struct JniContext {
         // Do not save parent directly, because parent is in VExpr, but jni 
context is in FunctionContext
         // The deconstruct sequence is not determined, it will core.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to