This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 3c3a396bb66 branch-4.0: [fix](java udf) fix possible deadlock when udf 
closed in bprc #55302 (#57944)
3c3a396bb66 is described below

commit 3c3a396bb669a13914082495c095a3bdcd1a3364
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Nov 12 17:49:11 2025 +0800

    branch-4.0: [fix](java udf) fix possible deadlock when udf closed in bprc 
#55302 (#57944)
    
    Cherry-picked from #55302
    
    Co-authored-by: Fangyuan Deng <[email protected]>
---
 be/src/util/doris_metrics.cpp              |  4 +++
 be/src/util/doris_metrics.h                |  2 ++
 be/src/vec/functions/function_java_udf.cpp | 46 ++++++++++++++++++++++++------
 be/src/vec/functions/function_java_udf.h   |  3 ++
 4 files changed, 47 insertions(+), 8 deletions(-)

diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index 8020c873e74..0c3a713aab7 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -226,6 +226,8 @@ 
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_total, MetricUnit::OPERAT
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_from_cache, 
MetricUnit::OPERATIONS);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_from_remote, 
MetricUnit::OPERATIONS);
 
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(udf_close_bthread_count, 
MetricUnit::OPERATIONS);
+
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_ctx_cnt, MetricUnit::NOUNIT);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_ctx_cnt, MetricUnit::NOUNIT);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_cnt, MetricUnit::NOUNIT);
@@ -393,6 +395,8 @@ DorisMetrics::DorisMetrics() : 
_metric_registry(_s_registry_name) {
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
num_io_bytes_read_from_cache);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
num_io_bytes_read_from_remote);
 
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
udf_close_bthread_count);
+
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_ctx_cnt);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_ctx_cnt);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_cnt);
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 2a400d181c3..6e1c3e1a30e 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -244,6 +244,8 @@ public:
     IntCounter* num_io_bytes_read_from_cache = nullptr;
     IntCounter* num_io_bytes_read_from_remote = nullptr;
 
+    IntCounter* udf_close_bthread_count = nullptr;
+
     IntCounter* query_ctx_cnt = nullptr;
     IntCounter* scanner_ctx_cnt = nullptr;
     IntCounter* scanner_cnt = nullptr;
diff --git a/be/src/vec/functions/function_java_udf.cpp 
b/be/src/vec/functions/function_java_udf.cpp
index 65f3056140d..ca024611ea2 100644
--- a/be/src/vec/functions/function_java_udf.cpp
+++ b/be/src/vec/functions/function_java_udf.cpp
@@ -17,9 +17,12 @@
 
 #include "vec/functions/function_java_udf.h"
 
+#include <bthread/bthread.h>
+
 #include <memory>
 #include <string>
 
+#include "common/exception.h"
 #include "jni.h"
 #include "runtime/user_function_cache.h"
 #include "util/jni-util.h"
@@ -32,9 +35,22 @@ const char* EXECUTOR_EVALUATE_SIGNATURE = 
"(Ljava/util/Map;Ljava/util/Map;)J";
 const char* EXECUTOR_CLOSE_SIGNATURE = "()V";
 
 namespace doris::vectorized {
+std::unique_ptr<ThreadPool> JavaFunctionCall::close_workers;
+std::once_flag JavaFunctionCall::close_workers_init_once;
+
 JavaFunctionCall::JavaFunctionCall(const TFunction& fn, const DataTypes& 
argument_types,
                                    const DataTypePtr& return_type)
-        : fn_(fn), _argument_types(argument_types), _return_type(return_type) 
{}
+        : fn_(fn), _argument_types(argument_types), _return_type(return_type) {
+    std::call_once(close_workers_init_once, []() {
+        auto build_st = ThreadPoolBuilder("UDFCloseWorkers")
+                                .set_min_threads(4)
+                                .set_max_threads(std::min(32, 
CpuInfo::num_cores()))
+                                .build(&close_workers);
+        if (!build_st.ok()) {
+            throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Failed to build 
UDFCloseWorkers");
+        }
+    });
+}
 
 Status JavaFunctionCall::open(FunctionContext* context, 
FunctionContext::FunctionStateScope scope) {
     JNIEnv* env = nullptr;
@@ -122,13 +138,27 @@ Status JavaFunctionCall::execute_impl(FunctionContext* 
context, Block& block,
 
 Status JavaFunctionCall::close(FunctionContext* context,
                                FunctionContext::FunctionStateScope scope) {
-    auto* jni_ctx = reinterpret_cast<JniContext*>(
-            context->get_function_state(FunctionContext::THREAD_LOCAL));
-    // JNIContext own some resource and its release method depend on 
JavaFunctionCall
-    // has to release the resource before JavaFunctionCall is deconstructed.
-    if (jni_ctx) {
-        RETURN_IF_ERROR(jni_ctx->close());
+    auto close_func = [context]() {
+        auto* jni_ctx = reinterpret_cast<JniContext*>(
+                context->get_function_state(FunctionContext::THREAD_LOCAL));
+        // JNIContext own some resource and its release method depend on 
JavaFunctionCall
+        // has to release the resource before JavaFunctionCall is 
deconstructed.
+        if (jni_ctx) {
+            RETURN_IF_ERROR(jni_ctx->close());
+        }
+        return Status::OK();
+    };
+
+    if (bthread_self() == 0) {
+        return close_func();
+    } else {
+        DorisMetrics::instance()->udf_close_bthread_count->increment(1);
+        // Use the close_workers pthread pool to execute the close function
+        auto task = 
std::make_shared<std::packaged_task<Status()>>(std::move(close_func));
+        auto task_future = task->get_future();
+        RETURN_IF_ERROR(close_workers->submit_func([task]() { (*task)(); }));
+        RETURN_IF_ERROR(task_future.get());
+        return Status::OK();
     }
-    return Status::OK();
 }
 } // namespace doris::vectorized
diff --git a/be/src/vec/functions/function_java_udf.h 
b/be/src/vec/functions/function_java_udf.h
index f0d41e73740..74e53aec2ed 100644
--- a/be/src/vec/functions/function_java_udf.h
+++ b/be/src/vec/functions/function_java_udf.h
@@ -110,6 +110,9 @@ private:
     const DataTypes _argument_types;
     const DataTypePtr _return_type;
 
+    static std::unique_ptr<ThreadPool> close_workers;
+    static std::once_flag close_workers_init_once;
+
     struct JniContext {
         // Do not save parent directly, because parent is in VExpr, but jni 
context is in FunctionContext
         // The deconstruct sequence is not determined, it will core.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to