This is an automated email from the ASF dual-hosted git repository.
zhangstar333 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7a74f845eca [fix](java udf) fix possible deadlock when udf closed in
bprc (#55302)
7a74f845eca is described below
commit 7a74f845eca494366b7f5b0c22cfedb9f4a0d835
Author: Fangyuan Deng <[email protected]>
AuthorDate: Mon Sep 29 11:23:32 2025 +0800
[fix](java udf) fix possible deadlock when udf closed in bprc (#55302)
### What problem does this PR solve?
Related PR: https://github.com/apache/doris/pull/30082
Problem Summary:
We are using doris 3.0.4 with java udf, when running long time, some BE
may got jvm deadlocks with UNKNOWN_owner_addr
[deadlock in udf close]
<img width="2314" height="882" alt="image"
src="https://github.com/user-attachments/assets/c16b5d18-9900-45dc-8bb4-894180694a44"
/>
[deadlock in throwableToStackTrace, waiting a StringWriter , but this is
impossible , because StringWriter was newed everytime]
<img width="1724" height="720" alt="image"
src="https://github.com/user-attachments/assets/b586f752-0ba8-43cc-89b4-352a7d9ab00c"
/>
<img width="618" height="144" alt="image"
src="https://github.com/user-attachments/assets/20c2ad11-beef-4b0c-b2a2-343d9b0fd14c"
/>
In be.WARNING we found get JNIEnv failed when close udf
<img width="1659" height="426" alt="image"
src="https://github.com/user-attachments/assets/6cf54aa2-522a-407e-a53a-e17fdbc9162e"
/>
In be.out we found libhdfs error log [JNIEnv is got by libhdfs when in
X86 ]
Call to AttachCurrentThread failed with error: -1
getJNIEnv: getGlobalJNIEnv failed
In be.WARNING there are some other err when using GetJniExceptionMsg
<img width="1686" height="530" alt="image"
src="https://github.com/user-attachments/assets/35ca7b4c-0a9d-4da3-8be2-9a07ccebb74b"
/>
we found all those err has common ground
1.all case occured in JavaFunctionCall::close
2.the stack all have bthread keyword
after seaching the web , we found that JNI is not compatible with
bthread
https://blog.csdn.net/qq_46104835/article/details/139360911
https://gitee.com/baidu/BRPC/blob/master/docs/cn/server.md#pthread%E6%A8%A1%E5%BC%8F
<img width="1101" height="211" alt="image"
src="https://github.com/user-attachments/assets/e1da2f9a-9e37-47d9-b0a1-c1652da93e75"
/>
the bthread stack layout is not compatible for java, so we may meed
1. AttachCurrentThread[using by getJNIEnv] will check stack layout fail
2. AttachCurrentThread/getJNIEnv may success, but java method may
execute failed, even deadlock, because the stack layout has incompatible
address for jvm
Then we switch bthread to pthread mode , every thing works fine.[add
-usercode_in_pthread=true]
We want to know how often bhread do the JavaFunctionCall::close, then we
add metrics. only 1/10000 JavaFunctionCall::close running in bthread
<img width="1886" height="358" alt="image"
src="https://github.com/user-attachments/assets/34293bac-696b-41f9-b5eb-4daaab81fb7f"
/>
But why JavaFunctionCall::close occured in bthread[ after
https://github.com/apache/doris/issues/16634, exec_plan_fragment is
running in pthread instead of bthread]
then we found a pr
https://github.com/apache/doris/pull/30082
ExchangeSinkBuffer<Parent>::_send_rpc will set a send_callback with a
weak_task_ctx
<img width="1392" height="254" alt="image"
src="https://github.com/user-attachments/assets/a1d28f18-c3c7-4953-baab-d1109f341aa1"
/>
So sometimes send_callback may using weak_task_ctx.lock() to get a
shared_ptr to task_ctx, then task_ctx destructor my be called in
send_callback[ send_callback is running in bthread]
So We modify JavaFunctionCall::close
when JavaFunctionCall::close running in bthread, we submit the jni
operation to a pthread pool, and wait it finish
because only 1/10000 JavaFunctionCall::close are running in bthread. the
pthread pool size can set to a small number.
---
be/src/util/doris_metrics.cpp | 4 +++
be/src/util/doris_metrics.h | 2 ++
be/src/vec/functions/function_java_udf.cpp | 46 ++++++++++++++++++++++++------
be/src/vec/functions/function_java_udf.h | 3 ++
4 files changed, 47 insertions(+), 8 deletions(-)
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index 60c16fbf9a9..60d67462d54 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -225,6 +225,8 @@
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_total, MetricUnit::OPERAT
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_from_cache,
MetricUnit::OPERATIONS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_from_remote,
MetricUnit::OPERATIONS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(udf_close_bthread_count,
MetricUnit::OPERATIONS);
+
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_ctx_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_ctx_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_cnt, MetricUnit::NOUNIT);
@@ -388,6 +390,8 @@ DorisMetrics::DorisMetrics() :
_metric_registry(_s_registry_name) {
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
num_io_bytes_read_from_cache);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
num_io_bytes_read_from_remote);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
udf_close_bthread_count);
+
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_ctx_cnt);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_ctx_cnt);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_cnt);
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index ddd79fc1ed2..92f556ce573 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -244,6 +244,8 @@ public:
IntCounter* num_io_bytes_read_from_cache = nullptr;
IntCounter* num_io_bytes_read_from_remote = nullptr;
+ IntCounter* udf_close_bthread_count = nullptr;
+
IntCounter* query_ctx_cnt = nullptr;
IntCounter* scanner_ctx_cnt = nullptr;
IntCounter* scanner_cnt = nullptr;
diff --git a/be/src/vec/functions/function_java_udf.cpp
b/be/src/vec/functions/function_java_udf.cpp
index 65f3056140d..ca024611ea2 100644
--- a/be/src/vec/functions/function_java_udf.cpp
+++ b/be/src/vec/functions/function_java_udf.cpp
@@ -17,9 +17,12 @@
#include "vec/functions/function_java_udf.h"
+#include <bthread/bthread.h>
+
#include <memory>
#include <string>
+#include "common/exception.h"
#include "jni.h"
#include "runtime/user_function_cache.h"
#include "util/jni-util.h"
@@ -32,9 +35,22 @@ const char* EXECUTOR_EVALUATE_SIGNATURE =
"(Ljava/util/Map;Ljava/util/Map;)J";
const char* EXECUTOR_CLOSE_SIGNATURE = "()V";
namespace doris::vectorized {
+std::unique_ptr<ThreadPool> JavaFunctionCall::close_workers;
+std::once_flag JavaFunctionCall::close_workers_init_once;
+
JavaFunctionCall::JavaFunctionCall(const TFunction& fn, const DataTypes&
argument_types,
const DataTypePtr& return_type)
- : fn_(fn), _argument_types(argument_types), _return_type(return_type)
{}
+ : fn_(fn), _argument_types(argument_types), _return_type(return_type) {
+ std::call_once(close_workers_init_once, []() {
+ auto build_st = ThreadPoolBuilder("UDFCloseWorkers")
+ .set_min_threads(4)
+ .set_max_threads(std::min(32,
CpuInfo::num_cores()))
+ .build(&close_workers);
+ if (!build_st.ok()) {
+ throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Failed to build
UDFCloseWorkers");
+ }
+ });
+}
Status JavaFunctionCall::open(FunctionContext* context,
FunctionContext::FunctionStateScope scope) {
JNIEnv* env = nullptr;
@@ -122,13 +138,27 @@ Status JavaFunctionCall::execute_impl(FunctionContext*
context, Block& block,
Status JavaFunctionCall::close(FunctionContext* context,
FunctionContext::FunctionStateScope scope) {
- auto* jni_ctx = reinterpret_cast<JniContext*>(
- context->get_function_state(FunctionContext::THREAD_LOCAL));
- // JNIContext own some resource and its release method depend on
JavaFunctionCall
- // has to release the resource before JavaFunctionCall is deconstructed.
- if (jni_ctx) {
- RETURN_IF_ERROR(jni_ctx->close());
+ auto close_func = [context]() {
+ auto* jni_ctx = reinterpret_cast<JniContext*>(
+ context->get_function_state(FunctionContext::THREAD_LOCAL));
+ // JNIContext own some resource and its release method depend on
JavaFunctionCall
+ // has to release the resource before JavaFunctionCall is
deconstructed.
+ if (jni_ctx) {
+ RETURN_IF_ERROR(jni_ctx->close());
+ }
+ return Status::OK();
+ };
+
+ if (bthread_self() == 0) {
+ return close_func();
+ } else {
+ DorisMetrics::instance()->udf_close_bthread_count->increment(1);
+ // Use the close_workers pthread pool to execute the close function
+ auto task =
std::make_shared<std::packaged_task<Status()>>(std::move(close_func));
+ auto task_future = task->get_future();
+ RETURN_IF_ERROR(close_workers->submit_func([task]() { (*task)(); }));
+ RETURN_IF_ERROR(task_future.get());
+ return Status::OK();
}
- return Status::OK();
}
} // namespace doris::vectorized
diff --git a/be/src/vec/functions/function_java_udf.h
b/be/src/vec/functions/function_java_udf.h
index f0d41e73740..74e53aec2ed 100644
--- a/be/src/vec/functions/function_java_udf.h
+++ b/be/src/vec/functions/function_java_udf.h
@@ -110,6 +110,9 @@ private:
const DataTypes _argument_types;
const DataTypePtr _return_type;
+ static std::unique_ptr<ThreadPool> close_workers;
+ static std::once_flag close_workers_init_once;
+
struct JniContext {
// Do not save parent directly, because parent is in VExpr, but jni
context is in FunctionContext
// The deconstruct sequence is not determined, it will core.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]