This is an automated email from the ASF dual-hosted git repository.
zclllyybb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 497cbf5c27e [Log](pyudf) Add progress logs for python process pool
init (#62974)
497cbf5c27e is described below
commit 497cbf5c27eaaecdbf2256ba6931b768ce7b5411
Author: linrrarity <[email protected]>
AuthorDate: Fri May 8 21:18:12 2026 +0800
[Log](pyudf) Add progress logs for python process pool init (#62974)
Improve observability for rare Python UDF process pool initialization
stalls.
Previously, when initialization was blocked or unusually slow, logs only
showed the start of pool creation, making it difficult to tell whether
BE was still waiting on process startup or where time was being spent.
This adds lightweight progress and elapsed-time logs to help diagnose
initialization hangs without increasing normal-case log volume.
And reduce the `max_python_process_num` to 16 in regression test, we
don't need so many processes for testing.
---
be/src/udf/python/python_server.cpp | 26 ++++++++++++++++++-
be/test/udf/python/python_server_test.cpp | 17 +++++++++++++
be/test/udf/python/python_udf_runtime_test.cpp | 29 +++++++++++-----------
.../pipeline/cloud_p0/conf/be_custom.conf | 2 +-
.../pipeline/cloud_p1/conf/be_custom.conf | 2 +-
regression-test/pipeline/external/conf/be.conf | 2 +-
.../pipeline/nonConcurrent/conf/be.conf | 2 +-
regression-test/pipeline/p0/conf/be.conf | 2 +-
regression-test/pipeline/p1/conf/be.conf | 2 +-
.../pipeline/vault_p0/conf/be_custom.conf | 2 +-
10 files changed, 64 insertions(+), 22 deletions(-)
diff --git a/be/src/udf/python/python_server.cpp
b/be/src/udf/python/python_server.cpp
index 228cab8d905..7e6e4cddbc1 100644
--- a/be/src/udf/python/python_server.cpp
+++ b/be/src/udf/python/python_server.cpp
@@ -26,8 +26,10 @@
#include <boost/asio.hpp>
#include <boost/process.hpp>
+#include <chrono>
#include <fstream>
#include <future>
+#include <thread>
#include "arrow/flight/client.h"
#include "common/config.h"
@@ -127,7 +129,25 @@ PythonServerManager::_ensure_pool_initialized(const
PythonVersion& version) {
int success_count = 0;
int failure_count = 0;
+ const auto init_start_time = std::chrono::steady_clock::now();
+#ifdef BE_TEST
+ constexpr auto progress_log_interval = std::chrono::milliseconds(50);
+#else
+ constexpr auto progress_log_interval = std::chrono::seconds(20);
+#endif
for (int i = 0; i < max_pool_size; i++) {
+ // Print init log every 20s until the current slot is ready.
+ while (futures[i].wait_for(progress_log_interval) !=
std::future_status::ready) {
+ const auto now = std::chrono::steady_clock::now();
+ const auto total_elapsed_ms =
+ std::chrono::duration_cast<std::chrono::milliseconds>(now
- init_start_time)
+ .count();
+ LOG(INFO) << "Python process pool initialization progress for
version "
+ << version.to_string() << ": waiting_slot=" << (i + 1)
<< "/" << max_pool_size
+ << ", success=" << success_count << ", failed=" <<
failure_count
+ << ", elapsed_ms=" << total_elapsed_ms;
+ }
+
Status s = futures[i].get();
if (s.ok() && temp_processes[i]) {
versioned_pool->processes.emplace_back(std::move(temp_processes[i]));
@@ -145,9 +165,13 @@ PythonServerManager::_ensure_pool_initialized(const
PythonVersion& version) {
max_pool_size));
}
+ const auto total_elapsed_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now() -
init_start_time)
+ .count();
LOG(INFO) << "Python process pool initialized for version " <<
version.to_string()
<< ": created " << success_count << " processes"
- << (failure_count > 0 ? fmt::format(" ({} failed)",
failure_count) : "");
+ << (failure_count > 0 ? fmt::format(" ({} failed)",
failure_count) : "")
+ << ", elapsed_ms=" << total_elapsed_ms;
versioned_pool->initialized = true;
_start_health_check_thread();
diff --git a/be/test/udf/python/python_server_test.cpp
b/be/test/udf/python/python_server_test.cpp
index 557815f7506..8fb006f35b7 100644
--- a/be/test/udf/python/python_server_test.cpp
+++ b/be/test/udf/python/python_server_test.cpp
@@ -403,6 +403,23 @@ TEST_F(PythonServerTest, EnsurePoolInitializedSuccess) {
mgr.shutdown();
}
+TEST_F(PythonServerTest,
EnsurePoolInitializedLogsProgressWhileWaitingForSlowProcess) {
+ setup_doris_home();
+ std::string python_path =
+
create_fake_python_with_delay_and_socket_creation("python3.delayed", "3.9.16",
200);
+
+ config::max_python_process_num = 1;
+
+ PythonServerManager mgr;
+ PythonVersion version("3.9.16", test_dir_, python_path);
+
+ auto result = mgr._ensure_pool_initialized(version);
+
+ EXPECT_TRUE(result.has_value()) << result.error().to_string();
+
+ mgr.shutdown();
+}
+
TEST_F(PythonServerTest, EnsurePoolInitializedIdempotent) {
setup_doris_home();
std::string python_path =
create_fake_python_with_socket_creation("3.9.16");
diff --git a/be/test/udf/python/python_udf_runtime_test.cpp
b/be/test/udf/python/python_udf_runtime_test.cpp
index c6ee6f00472..99728c0500a 100644
--- a/be/test/udf/python/python_udf_runtime_test.cpp
+++ b/be/test/udf/python/python_udf_runtime_test.cpp
@@ -205,9 +205,10 @@ TEST_F(PythonUDFRuntimeTest, FlightServerPathTemplate) {
// ============================================================================
TEST_F(PythonUDFRuntimeTest, ShutdownTerminatesProcess) {
- // Use /bin/cat which blocks waiting for stdin - reliable and fast
+ // Use sleep instead of a stdin-driven command. In CI, stdin may be closed
and
+ // commands like cat can exit before running() is checked.
bp::ipstream output;
- bp::child child("/bin/cat", bp::std_out > output);
+ bp::child child("/bin/sleep", "60", bp::std_out > output);
ASSERT_TRUE(child.valid());
ASSERT_TRUE(child.running());
@@ -229,7 +230,7 @@ TEST_F(PythonUDFRuntimeTest, ShutdownTerminatesProcess) {
TEST_F(PythonUDFRuntimeTest, ShutdownIdempotent) {
bp::ipstream output;
- bp::child child("/bin/cat", bp::std_out > output);
+ bp::child child("/bin/sleep", "60", bp::std_out > output);
PythonUDFProcess process(std::move(child), std::move(output));
@@ -247,7 +248,7 @@ TEST_F(PythonUDFRuntimeTest, ShutdownIdempotent) {
TEST_F(PythonUDFRuntimeTest, ShutdownWithStubbornProcess) {
// Create a process that ignores SIGTERM - tests the SIGKILL fallback path
bp::ipstream output;
- bp::child child("/bin/bash", "-c", "trap '' TERM; cat", bp::std_out >
output);
+ bp::child child("/bin/bash", "-c", "trap '' TERM; exec sleep 60",
bp::std_out > output);
PythonUDFProcess process(std::move(child), std::move(output));
EXPECT_TRUE(process.is_alive());
@@ -265,7 +266,7 @@ TEST_F(PythonUDFRuntimeTest, ShutdownWithStubbornProcess) {
TEST_F(PythonUDFRuntimeTest, RemoveUnixSocketExistingFile) {
bp::ipstream output;
- bp::child child("/bin/cat", bp::std_out > output);
+ bp::child child("/bin/sleep", "60", bp::std_out > output);
pid_t child_pid = child.id();
PythonUDFProcess process(std::move(child), std::move(output));
@@ -284,7 +285,7 @@ TEST_F(PythonUDFRuntimeTest, RemoveUnixSocketExistingFile) {
TEST_F(PythonUDFRuntimeTest, RemoveUnixSocketNonExistent) {
bp::ipstream output;
- bp::child child("/bin/cat", bp::std_out > output);
+ bp::child child("/bin/sleep", "60", bp::std_out > output);
pid_t child_pid = child.id();
PythonUDFProcess process(std::move(child), std::move(output));
@@ -300,7 +301,7 @@ TEST_F(PythonUDFRuntimeTest, RemoveUnixSocketNonExistent) {
TEST_F(PythonUDFRuntimeTest, RemoveUnixSocketIsDirectory) {
bp::ipstream output;
- bp::child child("/bin/cat", bp::std_out > output);
+ bp::child child("/bin/sleep", "60", bp::std_out > output);
pid_t child_pid = child.id();
@@ -326,7 +327,7 @@ TEST_F(PythonUDFRuntimeTest, RemoveUnixSocketIsDirectory) {
TEST_F(PythonUDFRuntimeTest, ToStringFormat) {
bp::ipstream output;
- bp::child child("/bin/cat", bp::std_out > output);
+ bp::child child("/bin/sleep", "60", bp::std_out > output);
pid_t child_pid = child.id();
PythonUDFProcess process(std::move(child), std::move(output));
@@ -350,7 +351,7 @@ TEST_F(PythonUDFRuntimeTest, ToStringFormat) {
TEST_F(PythonUDFRuntimeTest, GetUri) {
bp::ipstream output;
- bp::child child("/bin/cat", bp::std_out > output);
+ bp::child child("/bin/sleep", "60", bp::std_out > output);
pid_t child_pid = child.id();
PythonUDFProcess process(std::move(child), std::move(output));
@@ -367,7 +368,7 @@ TEST_F(PythonUDFRuntimeTest, GetUri) {
TEST_F(PythonUDFRuntimeTest, GetSocketFilePath) {
bp::ipstream output;
- bp::child child("/bin/cat", bp::std_out > output);
+ bp::child child("/bin/sleep", "60", bp::std_out > output);
pid_t child_pid = child.id();
PythonUDFProcess process(std::move(child), std::move(output));
@@ -389,8 +390,8 @@ TEST_F(PythonUDFRuntimeTest, GetSocketFilePath) {
TEST_F(PythonUDFRuntimeTest, ProcessEquality) {
bp::ipstream output1, output2;
- bp::child child1("/bin/cat", bp::std_out > output1);
- bp::child child2("/bin/cat", bp::std_out > output2);
+ bp::child child1("/bin/sleep", "60", bp::std_out > output1);
+ bp::child child2("/bin/sleep", "60", bp::std_out > output2);
PythonUDFProcess process1(std::move(child1), std::move(output1));
PythonUDFProcess process2(std::move(child2), std::move(output2));
@@ -410,7 +411,7 @@ TEST_F(PythonUDFRuntimeTest, DestructorCallsShutdown) {
pid_t child_pid;
{
bp::ipstream output;
- bp::child child("/bin/cat", bp::std_out > output);
+ bp::child child("/bin/sleep", "60", bp::std_out > output);
child_pid = child.id();
PythonUDFProcess process(std::move(child), std::move(output));
@@ -431,7 +432,7 @@ TEST_F(PythonUDFRuntimeTest, DestructorCallsShutdown) {
TEST_F(PythonUDFRuntimeTest, IsAliveReflectsState) {
bp::ipstream output;
- bp::child child("/bin/cat", bp::std_out > output);
+ bp::child child("/bin/sleep", "60", bp::std_out > output);
PythonUDFProcess process(std::move(child), std::move(output));
diff --git a/regression-test/pipeline/cloud_p0/conf/be_custom.conf
b/regression-test/pipeline/cloud_p0/conf/be_custom.conf
index 01753481dc3..4b7694bc286 100644
--- a/regression-test/pipeline/cloud_p0/conf/be_custom.conf
+++ b/regression-test/pipeline/cloud_p0/conf/be_custom.conf
@@ -63,7 +63,7 @@ fail_when_segment_rows_not_in_rowset_meta=true
enable_python_udf_support=true
python_env_mode=conda
python_conda_root_path=/opt/miniconda3
-max_python_process_num=64
+max_python_process_num=16
enable_cloud_make_rs_visible_on_be=true
cloud_mow_sync_rowsets_when_load_txn_begin=false
diff --git a/regression-test/pipeline/cloud_p1/conf/be_custom.conf
b/regression-test/pipeline/cloud_p1/conf/be_custom.conf
index 0b9d27e98a7..ade5308ded6 100644
--- a/regression-test/pipeline/cloud_p1/conf/be_custom.conf
+++ b/regression-test/pipeline/cloud_p1/conf/be_custom.conf
@@ -45,7 +45,7 @@ enable_segment_rows_check_core=true
enable_python_udf_support=true
python_env_mode=conda
python_conda_root_path=/opt/miniconda3
-max_python_process_num=64
+max_python_process_num=16
enable_cloud_make_rs_visible_on_be=true
cloud_mow_sync_rowsets_when_load_txn_begin=false
diff --git a/regression-test/pipeline/external/conf/be.conf
b/regression-test/pipeline/external/conf/be.conf
index 073c5d9990d..f29f342366e 100644
--- a/regression-test/pipeline/external/conf/be.conf
+++ b/regression-test/pipeline/external/conf/be.conf
@@ -77,4 +77,4 @@ enable_graceful_exit_check=true
enable_python_udf_support=true
python_env_mode=venv
python_venv_interpreter_paths=/usr/bin/python
-max_python_process_num=64
+max_python_process_num=16
diff --git a/regression-test/pipeline/nonConcurrent/conf/be.conf
b/regression-test/pipeline/nonConcurrent/conf/be.conf
index 2a3031651cf..b308eb8ab62 100644
--- a/regression-test/pipeline/nonConcurrent/conf/be.conf
+++ b/regression-test/pipeline/nonConcurrent/conf/be.conf
@@ -98,4 +98,4 @@ enable_segment_rows_check_core=true
enable_python_udf_support=true
python_env_mode=venv
python_venv_interpreter_paths=/usr/bin/python
-max_python_process_num=64
+max_python_process_num=16
diff --git a/regression-test/pipeline/p0/conf/be.conf
b/regression-test/pipeline/p0/conf/be.conf
index 0fff884d858..b732395e7e4 100644
--- a/regression-test/pipeline/p0/conf/be.conf
+++ b/regression-test/pipeline/p0/conf/be.conf
@@ -101,4 +101,4 @@ fail_when_segment_rows_not_in_rowset_meta=true
enable_python_udf_support=true
python_env_mode=venv
python_venv_interpreter_paths=/usr/bin/python
-max_python_process_num=64
+max_python_process_num=16
diff --git a/regression-test/pipeline/p1/conf/be.conf
b/regression-test/pipeline/p1/conf/be.conf
index 29646ae3025..8f68115270c 100644
--- a/regression-test/pipeline/p1/conf/be.conf
+++ b/regression-test/pipeline/p1/conf/be.conf
@@ -84,4 +84,4 @@ enable_segment_rows_check_core=true
enable_python_udf_support=true
python_env_mode=venv
python_venv_interpreter_paths=/usr/bin/python
-max_python_process_num=64
+max_python_process_num=16
diff --git a/regression-test/pipeline/vault_p0/conf/be_custom.conf
b/regression-test/pipeline/vault_p0/conf/be_custom.conf
index 85a86ea733e..779eb5a77a5 100644
--- a/regression-test/pipeline/vault_p0/conf/be_custom.conf
+++ b/regression-test/pipeline/vault_p0/conf/be_custom.conf
@@ -44,4 +44,4 @@ enable_brpc_connection_check=true
enable_python_udf_support=true
python_env_mode=venv
python_venv_interpreter_paths=/usr/bin/python
-max_python_process_num=64
+max_python_process_num=16
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]