This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 3c3a396bb66 branch-4.0: [fix](java udf) fix possible deadlock when udf
closed in bprc #55302 (#57944)
3c3a396bb66 is described below
commit 3c3a396bb669a13914082495c095a3bdcd1a3364
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Nov 12 17:49:11 2025 +0800
branch-4.0: [fix](java udf) fix possible deadlock when udf closed in bprc
#55302 (#57944)
Cherry-picked from #55302
Co-authored-by: Fangyuan Deng <[email protected]>
---
be/src/util/doris_metrics.cpp | 4 +++
be/src/util/doris_metrics.h | 2 ++
be/src/vec/functions/function_java_udf.cpp | 46 ++++++++++++++++++++++++------
be/src/vec/functions/function_java_udf.h | 3 ++
4 files changed, 47 insertions(+), 8 deletions(-)
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index 8020c873e74..0c3a713aab7 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -226,6 +226,8 @@
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_total, MetricUnit::OPERAT
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_from_cache,
MetricUnit::OPERATIONS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(num_io_bytes_read_from_remote,
MetricUnit::OPERATIONS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(udf_close_bthread_count,
MetricUnit::OPERATIONS);
+
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_ctx_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_ctx_cnt, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_cnt, MetricUnit::NOUNIT);
@@ -393,6 +395,8 @@ DorisMetrics::DorisMetrics() :
_metric_registry(_s_registry_name) {
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
num_io_bytes_read_from_cache);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
num_io_bytes_read_from_remote);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
udf_close_bthread_count);
+
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_ctx_cnt);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_ctx_cnt);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_cnt);
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 2a400d181c3..6e1c3e1a30e 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -244,6 +244,8 @@ public:
IntCounter* num_io_bytes_read_from_cache = nullptr;
IntCounter* num_io_bytes_read_from_remote = nullptr;
+ IntCounter* udf_close_bthread_count = nullptr;
+
IntCounter* query_ctx_cnt = nullptr;
IntCounter* scanner_ctx_cnt = nullptr;
IntCounter* scanner_cnt = nullptr;
diff --git a/be/src/vec/functions/function_java_udf.cpp
b/be/src/vec/functions/function_java_udf.cpp
index 65f3056140d..ca024611ea2 100644
--- a/be/src/vec/functions/function_java_udf.cpp
+++ b/be/src/vec/functions/function_java_udf.cpp
@@ -17,9 +17,12 @@
#include "vec/functions/function_java_udf.h"
+#include <bthread/bthread.h>
+
#include <memory>
#include <string>
+#include "common/exception.h"
#include "jni.h"
#include "runtime/user_function_cache.h"
#include "util/jni-util.h"
@@ -32,9 +35,22 @@ const char* EXECUTOR_EVALUATE_SIGNATURE =
"(Ljava/util/Map;Ljava/util/Map;)J";
const char* EXECUTOR_CLOSE_SIGNATURE = "()V";
namespace doris::vectorized {
+std::unique_ptr<ThreadPool> JavaFunctionCall::close_workers;
+std::once_flag JavaFunctionCall::close_workers_init_once;
+
JavaFunctionCall::JavaFunctionCall(const TFunction& fn, const DataTypes&
argument_types,
const DataTypePtr& return_type)
- : fn_(fn), _argument_types(argument_types), _return_type(return_type)
{}
+ : fn_(fn), _argument_types(argument_types), _return_type(return_type) {
+ std::call_once(close_workers_init_once, []() {
+ auto build_st = ThreadPoolBuilder("UDFCloseWorkers")
+ .set_min_threads(4)
+ .set_max_threads(std::min(32,
CpuInfo::num_cores()))
+ .build(&close_workers);
+ if (!build_st.ok()) {
+ throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Failed to build
UDFCloseWorkers");
+ }
+ });
+}
Status JavaFunctionCall::open(FunctionContext* context,
FunctionContext::FunctionStateScope scope) {
JNIEnv* env = nullptr;
@@ -122,13 +138,27 @@ Status JavaFunctionCall::execute_impl(FunctionContext*
context, Block& block,
Status JavaFunctionCall::close(FunctionContext* context,
FunctionContext::FunctionStateScope scope) {
- auto* jni_ctx = reinterpret_cast<JniContext*>(
- context->get_function_state(FunctionContext::THREAD_LOCAL));
- // JNIContext own some resource and its release method depend on
JavaFunctionCall
- // has to release the resource before JavaFunctionCall is deconstructed.
- if (jni_ctx) {
- RETURN_IF_ERROR(jni_ctx->close());
+ auto close_func = [context]() {
+ auto* jni_ctx = reinterpret_cast<JniContext*>(
+ context->get_function_state(FunctionContext::THREAD_LOCAL));
+ // JNIContext own some resource and its release method depend on
JavaFunctionCall
+ // has to release the resource before JavaFunctionCall is
deconstructed.
+ if (jni_ctx) {
+ RETURN_IF_ERROR(jni_ctx->close());
+ }
+ return Status::OK();
+ };
+
+ if (bthread_self() == 0) {
+ return close_func();
+ } else {
+ DorisMetrics::instance()->udf_close_bthread_count->increment(1);
+ // Use the close_workers pthread pool to execute the close function
+ auto task =
std::make_shared<std::packaged_task<Status()>>(std::move(close_func));
+ auto task_future = task->get_future();
+ RETURN_IF_ERROR(close_workers->submit_func([task]() { (*task)(); }));
+ RETURN_IF_ERROR(task_future.get());
+ return Status::OK();
}
- return Status::OK();
}
} // namespace doris::vectorized
diff --git a/be/src/vec/functions/function_java_udf.h
b/be/src/vec/functions/function_java_udf.h
index f0d41e73740..74e53aec2ed 100644
--- a/be/src/vec/functions/function_java_udf.h
+++ b/be/src/vec/functions/function_java_udf.h
@@ -110,6 +110,9 @@ private:
const DataTypes _argument_types;
const DataTypePtr _return_type;
+ static std::unique_ptr<ThreadPool> close_workers;
+ static std::once_flag close_workers_init_once;
+
struct JniContext {
// Do not save parent directly, because parent is in VExpr, but jni
context is in FunctionContext
// The deconstruct sequence is not determined, it will core.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]