This is an automated email from the ASF dual-hosted git repository.

zhangstar333 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a74f845eca [fix](java udf) fix possible deadlock when udf closed in 
bprc (#55302)
7a74f845eca is described below

commit 7a74f845eca494366b7f5b0c22cfedb9f4a0d835
Author: Fangyuan Deng <[email protected]>
AuthorDate: Mon Sep 29 11:23:32 2025 +0800

    [fix](java udf) fix possible deadlock when udf closed in bprc (#55302)
    
    ### What problem does this PR solve?
    
    Related PR: https://github.com/apache/doris/pull/30082
    
    Problem Summary:
    
    We are using doris 3.0.4 with java udf, when running long time, some BE
    may got jvm deadlocks with UNKNOWN_owner_addr
    [deadlock in udf close]
    <img width="2314" height="882" alt="image"
    
src="https://github.com/user-attachments/assets/c16b5d18-9900-45dc-8bb4-894180694a44";
    />
    [deadlock in throwableToStackTrace, waiting a StringWriter , but this is
    impossible , because StringWriter was newed everytime]
    <img width="1724" height="720" alt="image"
    
src="https://github.com/user-attachments/assets/b586f752-0ba8-43cc-89b4-352a7d9ab00c";
    />
    <img width="618" height="144" alt="image"
    
src="https://github.com/user-attachments/assets/20c2ad11-beef-4b0c-b2a2-343d9b0fd14c";
    />
    
    
    In be.WARNING we found  get JNIEnv failed when close udf
    <img width="1659" height="426" alt="image"
    
src="https://github.com/user-attachments/assets/6cf54aa2-522a-407e-a53a-e17fdbc9162e";
    />
    In be.out we found libhdfs error log [JNIEnv is got by libhdfs when in
    X86 ]
    Call to AttachCurrentThread failed with error: -1
    getJNIEnv: getGlobalJNIEnv failed
    
    In be.WARNING there are some other err  when using GetJniExceptionMsg
    <img width="1686" height="530" alt="image"
    
src="https://github.com/user-attachments/assets/35ca7b4c-0a9d-4da3-8be2-9a07ccebb74b";
    />
    
    we found all those err has common ground
    1.all case occured  in JavaFunctionCall::close
    2.the stack all have bthread keyword
    
    after seaching the web , we found that JNI is not compatible with
    bthread
    https://blog.csdn.net/qq_46104835/article/details/139360911
    
    
https://gitee.com/baidu/BRPC/blob/master/docs/cn/server.md#pthread%E6%A8%A1%E5%BC%8F
    <img width="1101" height="211" alt="image"
    
src="https://github.com/user-attachments/assets/e1da2f9a-9e37-47d9-b0a1-c1652da93e75";
    />
    
    the bthread stack layout is not compatible for java, so we may meed
    1. AttachCurrentThread[using by getJNIEnv] will check stack layout fail
    2. AttachCurrentThread/getJNIEnv may success, but java method may
    execute failed, even deadlock, because the stack layout has incompatible
    address for jvm
    
    
    Then we switch bthread to pthread mode , every thing works fine.[add
    -usercode_in_pthread=true]
    
    We want to know how often bhread do the JavaFunctionCall::close, then we
    add metrics. only 1/10000 JavaFunctionCall::close running in bthread
    <img width="1886" height="358" alt="image"
    
src="https://github.com/user-attachments/assets/34293bac-696b-41f9-b5eb-4daaab81fb7f";
    />
    
    But why JavaFunctionCall::close occured in bthread[ after
    https://github.com/apache/doris/issues/16634, exec_plan_fragment is
    running in pthread instead of bthread]
    
    then we found a pr
    https://github.com/apache/doris/pull/30082
    
    ExchangeSinkBuffer<Parent>::_send_rpc will set a send_callback with a
    weak_task_ctx
    <img width="1392" height="254" alt="image"
    
src="https://github.com/user-attachments/assets/a1d28f18-c3c7-4953-baab-d1109f341aa1";
    />
    So sometimes send_callback may using weak_task_ctx.lock() to get a
    shared_ptr to task_ctx, then task_ctx destructor my be called in
    send_callback[ send_callback is running in bthread]
    
    So We modify JavaFunctionCall::close
    when JavaFunctionCall::close running in bthread, we submit the jni
    operation to a pthread pool, and wait it finish
    because only 1/10000 JavaFunctionCall::close are running in bthread. the
    pthread pool size can set to a small number.
---
 be/src/util/doris_metrics.cpp              |  4 +++
 be/src/util/doris_metrics.h                |  2 ++
 be/src/vec/functions/function_java_udf.cpp | 46 ++++++++++++++++++++++++------
 be/src/vec/functions/function_java_udf.h   |  3 ++
 4 files changed, 47 insertions(+), 8 deletions(-)

diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index 60c16fbf9a9..60d67462d54 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -225,6 +225,8 @@ 
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_total, MetricUnit::OPERAT
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_from_cache, 
MetricUnit::OPERATIONS);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_from_remote, 
MetricUnit::OPERATIONS);
 
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(udf_close_bthread_count, 
MetricUnit::OPERATIONS);
+
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_ctx_cnt, MetricUnit::NOUNIT);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_ctx_cnt, MetricUnit::NOUNIT);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_cnt, MetricUnit::NOUNIT);
@@ -388,6 +390,8 @@ DorisMetrics::DorisMetrics() : 
_metric_registry(_s_registry_name) {
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
num_io_bytes_read_from_cache);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
num_io_bytes_read_from_remote);
 
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
udf_close_bthread_count);
+
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_ctx_cnt);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_ctx_cnt);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_cnt);
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index ddd79fc1ed2..92f556ce573 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -244,6 +244,8 @@ public:
     IntCounter* num_io_bytes_read_from_cache = nullptr;
     IntCounter* num_io_bytes_read_from_remote = nullptr;
 
+    IntCounter* udf_close_bthread_count = nullptr;
+
     IntCounter* query_ctx_cnt = nullptr;
     IntCounter* scanner_ctx_cnt = nullptr;
     IntCounter* scanner_cnt = nullptr;
diff --git a/be/src/vec/functions/function_java_udf.cpp 
b/be/src/vec/functions/function_java_udf.cpp
index 65f3056140d..ca024611ea2 100644
--- a/be/src/vec/functions/function_java_udf.cpp
+++ b/be/src/vec/functions/function_java_udf.cpp
@@ -17,9 +17,12 @@
 
 #include "vec/functions/function_java_udf.h"
 
+#include <bthread/bthread.h>
+
 #include <memory>
 #include <string>
 
+#include "common/exception.h"
 #include "jni.h"
 #include "runtime/user_function_cache.h"
 #include "util/jni-util.h"
@@ -32,9 +35,22 @@ const char* EXECUTOR_EVALUATE_SIGNATURE = 
"(Ljava/util/Map;Ljava/util/Map;)J";
 const char* EXECUTOR_CLOSE_SIGNATURE = "()V";
 
 namespace doris::vectorized {
+std::unique_ptr<ThreadPool> JavaFunctionCall::close_workers;
+std::once_flag JavaFunctionCall::close_workers_init_once;
+
 JavaFunctionCall::JavaFunctionCall(const TFunction& fn, const DataTypes& 
argument_types,
                                    const DataTypePtr& return_type)
-        : fn_(fn), _argument_types(argument_types), _return_type(return_type) 
{}
+        : fn_(fn), _argument_types(argument_types), _return_type(return_type) {
+    std::call_once(close_workers_init_once, []() {
+        auto build_st = ThreadPoolBuilder("UDFCloseWorkers")
+                                .set_min_threads(4)
+                                .set_max_threads(std::min(32, 
CpuInfo::num_cores()))
+                                .build(&close_workers);
+        if (!build_st.ok()) {
+            throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Failed to build 
UDFCloseWorkers");
+        }
+    });
+}
 
 Status JavaFunctionCall::open(FunctionContext* context, 
FunctionContext::FunctionStateScope scope) {
     JNIEnv* env = nullptr;
@@ -122,13 +138,27 @@ Status JavaFunctionCall::execute_impl(FunctionContext* 
context, Block& block,
 
 Status JavaFunctionCall::close(FunctionContext* context,
                                FunctionContext::FunctionStateScope scope) {
-    auto* jni_ctx = reinterpret_cast<JniContext*>(
-            context->get_function_state(FunctionContext::THREAD_LOCAL));
-    // JNIContext own some resource and its release method depend on 
JavaFunctionCall
-    // has to release the resource before JavaFunctionCall is deconstructed.
-    if (jni_ctx) {
-        RETURN_IF_ERROR(jni_ctx->close());
+    auto close_func = [context]() {
+        auto* jni_ctx = reinterpret_cast<JniContext*>(
+                context->get_function_state(FunctionContext::THREAD_LOCAL));
+        // JNIContext own some resource and its release method depend on 
JavaFunctionCall
+        // has to release the resource before JavaFunctionCall is 
deconstructed.
+        if (jni_ctx) {
+            RETURN_IF_ERROR(jni_ctx->close());
+        }
+        return Status::OK();
+    };
+
+    if (bthread_self() == 0) {
+        return close_func();
+    } else {
+        DorisMetrics::instance()->udf_close_bthread_count->increment(1);
+        // Use the close_workers pthread pool to execute the close function
+        auto task = 
std::make_shared<std::packaged_task<Status()>>(std::move(close_func));
+        auto task_future = task->get_future();
+        RETURN_IF_ERROR(close_workers->submit_func([task]() { (*task)(); }));
+        RETURN_IF_ERROR(task_future.get());
+        return Status::OK();
     }
-    return Status::OK();
 }
 } // namespace doris::vectorized
diff --git a/be/src/vec/functions/function_java_udf.h 
b/be/src/vec/functions/function_java_udf.h
index f0d41e73740..74e53aec2ed 100644
--- a/be/src/vec/functions/function_java_udf.h
+++ b/be/src/vec/functions/function_java_udf.h
@@ -110,6 +110,9 @@ private:
     const DataTypes _argument_types;
     const DataTypePtr _return_type;
 
+    static std::unique_ptr<ThreadPool> close_workers;
+    static std::once_flag close_workers_init_once;
+
     struct JniContext {
         // Do not save parent directly, because parent is in VExpr, but jni 
context is in FunctionContext
         // The deconstruct sequence is not determined, it will core.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to