This is an automated email from the ASF dual-hosted git repository.
HappenLee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3f3c79c01c7 [Fix](pyudf) make Python server pool selection alive-aware
and version-isolated (#62620)
3f3c79c01c7 is described below
commit 3f3c79c01c7d386122058a33db8bcd1fc3b45c77
Author: linrrarity <[email protected]>
AuthorDate: Thu May 7 21:04:28 2026 +0800
[Fix](pyudf) make Python server pool selection alive-aware and
version-isolated (#62620)
`PythonServerManager` does not check if the python process corresponding
to the version is alive when retrieving the process, which may cause
errors like:
```text
java.lang.IllegalStateException: PYTHON_UDF_BLOCKED
suite=python_udf_cross_feature_import_storage
scenario=python_udf_cross_feature_import_storage.inline_runtime reason=inline
probe failed. reason=errCode = 2, detailMessage =
(172.20.49.73)[INTERNAL_ERROR]IOError: Flight stream finish failed with gRPC
code 14, message: failed to connect to all addresses; last error: UNKNOWN:
unix:/tmp/doris_python_udf_55799.sock: Connection refused
```
After modification, before obtaining the Python process, it will check
if the process is alive to ensure the availability of this feature.
---
be/src/udf/python/python_server.cpp | 153 +++++++++++----
be/src/udf/python/python_server.h | 46 +++--
be/test/udf/python/python_server_test.cpp | 212 +++++++++++++++++----
.../data/pythonudaf_p0/test_pythonudaf_drop.out | 6 +
.../data/pythonudf_p0/test_pythonudf_drop.out | 6 +
.../data/pythonudtf_p0/test_pythonudtf_drop.out | 8 +
.../pythonudaf_p0/test_pythonudaf_drop.groovy | 39 +++-
.../suites/pythonudf_p0/test_pythonudf_drop.groovy | 39 +++-
.../pythonudtf_p0/test_pythonudtf_drop.groovy | 51 ++++-
9 files changed, 465 insertions(+), 95 deletions(-)
diff --git a/be/src/udf/python/python_server.cpp
b/be/src/udf/python/python_server.cpp
index 646e1e79039..228cab8d905 100644
--- a/be/src/udf/python/python_server.cpp
+++ b/be/src/udf/python/python_server.cpp
@@ -27,6 +27,7 @@
#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <fstream>
+#include <future>
#include "arrow/flight/client.h"
#include "common/config.h"
@@ -37,32 +38,70 @@
namespace doris {
-template <typename T>
+std::shared_ptr<PythonServerManager::VersionedProcessPool>
+PythonServerManager::_get_or_create_process_pool(const PythonVersion& version)
{
+ std::lock_guard<std::mutex> lock(_pools_mutex);
+ auto& pool = _process_pools[version];
+ if (!pool) {
+ pool = std::make_shared<VersionedProcessPool>();
+ }
+ return pool;
+}
+
+std::vector<std::pair<PythonVersion,
std::shared_ptr<PythonServerManager::VersionedProcessPool>>>
+PythonServerManager::_snapshot_process_pools() {
+ std::lock_guard<std::mutex> lock(_pools_mutex);
+ std::vector<std::pair<PythonVersion,
std::shared_ptr<VersionedProcessPool>>> snapshot;
+ snapshot.reserve(_process_pools.size());
+ for (const auto& [version, pool] : _process_pools) {
+ snapshot.emplace_back(version, pool);
+ }
+ return snapshot;
+}
+
+#ifdef BE_TEST
+void PythonServerManager::set_process_pool_for_test(const PythonVersion&
version,
+ std::vector<ProcessPtr>
processes,
+ bool initialized) {
+ auto versioned_pool = _get_or_create_process_pool(version);
+ std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+ versioned_pool->processes = std::move(processes);
+ versioned_pool->initialized = initialized;
+}
+
+std::vector<ProcessPtr>& PythonServerManager::process_pool_for_test(const
PythonVersion& version) {
+ auto versioned_pool = _get_or_create_process_pool(version);
+ return versioned_pool->processes;
+}
+#endif
+
+template <typename ClientType>
Status PythonServerManager::get_client(const PythonUDFMeta& func_meta, const
PythonVersion& version,
- std::shared_ptr<T>* client,
+ std::shared_ptr<ClientType>* client,
const std::shared_ptr<arrow::Schema>&
data_schema) {
- // Ensure process pool is initialized for this version
- RETURN_IF_ERROR(ensure_pool_initialized(version));
+ std::shared_ptr<VersionedProcessPool> versioned_pool =
+ DORIS_TRY(_ensure_pool_initialized(version));
ProcessPtr process;
- RETURN_IF_ERROR(get_process(version, &process));
+ RETURN_IF_ERROR(_get_process(version, versioned_pool, &process));
- if constexpr (std::is_same_v<T, PythonUDAFClient>) {
- RETURN_IF_ERROR(T::create(func_meta, std::move(process), data_schema,
client));
+ if constexpr (std::is_same_v<ClientType, PythonUDAFClient>) {
+ RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process),
data_schema, client));
} else {
- RETURN_IF_ERROR(T::create(func_meta, std::move(process), client));
+ RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process),
client));
}
return Status::OK();
}
-Status PythonServerManager::ensure_pool_initialized(const PythonVersion&
version) {
- std::lock_guard<std::mutex> lock(_pools_mutex);
+Result<std::shared_ptr<PythonServerManager::VersionedProcessPool>>
+PythonServerManager::_ensure_pool_initialized(const PythonVersion& version) {
+ auto versioned_pool = _get_or_create_process_pool(version);
+ std::lock_guard<std::mutex> lock(versioned_pool->mutex);
// Check if already initialized
- if (_initialized_versions.count(version)) return Status::OK();
+ if (versioned_pool->initialized) return versioned_pool;
- std::vector<ProcessPtr>& pool = _process_pools[version];
// 0 means use CPU core count as default, otherwise use the specified value
int max_pool_size = config::max_python_process_num > 0 ?
config::max_python_process_num
:
CpuInfo::num_cores();
@@ -91,7 +130,7 @@ Status PythonServerManager::ensure_pool_initialized(const
PythonVersion& version
for (int i = 0; i < max_pool_size; i++) {
Status s = futures[i].get();
if (s.ok() && temp_processes[i]) {
- pool.push_back(std::move(temp_processes[i]));
+
versioned_pool->processes.emplace_back(std::move(temp_processes[i]));
success_count++;
} else {
failure_count++;
@@ -100,38 +139,64 @@ Status PythonServerManager::ensure_pool_initialized(const
PythonVersion& version
}
}
- if (pool.empty()) {
- return Status::InternalError(
+ if (versioned_pool->processes.empty()) {
+ return ResultError(Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
"Failed to initialize Python process pool: all {} process
creation attempts failed",
- max_pool_size);
+ max_pool_size));
}
LOG(INFO) << "Python process pool initialized for version " <<
version.to_string()
<< ": created " << success_count << " processes"
<< (failure_count > 0 ? fmt::format(" ({} failed)",
failure_count) : "");
- _initialized_versions.insert(version);
+ versioned_pool->initialized = true;
_start_health_check_thread();
- return Status::OK();
+ return versioned_pool;
}
-Status PythonServerManager::get_process(const PythonVersion& version,
ProcessPtr* process) {
- std::lock_guard<std::mutex> lock(_pools_mutex);
- std::vector<ProcessPtr>& pool = _process_pools[version];
+Status PythonServerManager::_get_process(
+ const PythonVersion& version, const
std::shared_ptr<VersionedProcessPool>& versioned_pool,
+ ProcessPtr* process) {
+ std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+ std::vector<ProcessPtr>& pool = versioned_pool->processes;
if (UNLIKELY(pool.empty())) {
return Status::InternalError("Python process pool is empty for version
{}",
version.to_string());
}
- // Find process with minimum load (use_count - 1 gives active client count)
- auto min_iter = std::min_element(
- pool.begin(), pool.end(),
- [](const ProcessPtr& a, const ProcessPtr& b) { return
a.use_count() < b.use_count(); });
+ // Prefer an already-alive process and only use load balancing inside that
alive subset.
+ // keep dead entries stay in the pool for the background health checker
+ // unless there is no alive process left for the current request.
+ auto min_alive_iter = std::min_element(pool.begin(), pool.end(),
+ [](const ProcessPtr& a, const
ProcessPtr& b) {
+ const bool a_alive = a &&
a->is_alive();
+ const bool b_alive = b &&
b->is_alive();
+ if (a_alive != b_alive) {
+ return a_alive > b_alive;
+ }
+ return a.use_count() <
b.use_count();
+ });
+
+ if (min_alive_iter != pool.end() && *min_alive_iter &&
(*min_alive_iter)->is_alive()) {
+ *process = *min_alive_iter;
+ return Status::OK();
+ }
+
+ // Only reach here when the pool has no alive process at all. Try one
foreground
+ // recovery so the caller has a chance to proceed; leave batch repair to
health check.
+ auto& candidate = pool.front();
+ ProcessPtr replacement;
+ Status status = fork(version, &replacement);
+ if (!status.ok()) {
+ return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
+ "Python process pool has no available process for version {},
reason: {}",
+ version.to_string(), status.to_string());
+ }
- // Return process with minimum load
- *process = *min_iter;
+ candidate = std::move(replacement);
+ *process = candidate;
return Status::OK();
}
@@ -191,6 +256,7 @@ Status PythonServerManager::fork(const PythonVersion&
version, ProcessPtr* proce
}
void PythonServerManager::_start_health_check_thread() {
+ std::lock_guard<std::mutex> lock(_health_check_mutex);
if (_health_check_thread) return;
LOG(INFO) << "Starting Python process health check thread (interval: 30
seconds)";
@@ -217,13 +283,13 @@ void PythonServerManager::_start_health_check_thread() {
}
void PythonServerManager::_check_and_recreate_processes() {
- std::lock_guard<std::mutex> lock(_pools_mutex);
-
int total_checked = 0;
int total_dead = 0;
int total_recreated = 0;
- for (auto& [version, pool] : _process_pools) {
+ for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
+ std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+ auto& pool = versioned_pool->processes;
for (size_t i = 0; i < pool.size(); ++i) {
auto& process = pool[i];
if (!process) continue;
@@ -268,15 +334,22 @@ void PythonServerManager::shutdown() {
}
// Shutdown all processes
- std::lock_guard<std::mutex> lock(_pools_mutex);
- for (auto& [version, pool] : _process_pools) {
+ for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
+ std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+ auto& pool = versioned_pool->processes;
for (auto& process : pool) {
if (process) {
process->shutdown();
}
}
+ pool.clear();
+ versioned_pool->initialized = false;
+ }
+
+ {
+ std::lock_guard<std::mutex> lock(_pools_mutex);
+ _process_pools.clear();
}
- _process_pools.clear();
}
Status PythonServerManager::_read_process_memory(pid_t pid, size_t* rss_bytes)
{
@@ -305,11 +378,11 @@ Status PythonServerManager::_read_process_memory(pid_t
pid, size_t* rss_bytes) {
}
void PythonServerManager::_refresh_memory_stats() {
- std::lock_guard<std::mutex> lock(_pools_mutex);
-
int64_t total_rss = 0;
- for (const auto& [version, pool] : _process_pools) {
+ for (const auto& [version, versioned_pool] : _snapshot_process_pools()) {
+ std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+ const auto& pool = versioned_pool->processes;
for (const auto& process : pool) {
if (!process || !process->is_alive()) continue;
@@ -339,15 +412,15 @@ Status PythonServerManager::clear_module_cache(const
std::string& location) {
return Status::InvalidArgument("Empty location for
clear_module_cache");
}
- std::lock_guard<std::mutex> lock(_pools_mutex);
-
std::string body = fmt::format(R"({{"location": "{}"}})", location);
int success_count = 0;
int fail_count = 0;
bool has_active_process = false;
- for (auto& [version, pool] : _process_pools) {
+ for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
+ std::lock_guard<std::mutex> lock(versioned_pool->mutex);
+ auto& pool = versioned_pool->processes;
for (auto& process : pool) {
if (!process || !process->is_alive()) {
continue;
@@ -422,4 +495,4 @@ template Status
PythonServerManager::get_client<PythonUDTFClient>(
std::shared_ptr<PythonUDTFClient>* client,
const std::shared_ptr<arrow::Schema>& data_schema);
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/udf/python/python_server.h
b/be/src/udf/python/python_server.h
index 6427cb7e63c..7aa452740c7 100644
--- a/be/src/udf/python/python_server.h
+++ b/be/src/udf/python/python_server.h
@@ -20,7 +20,10 @@
#include <atomic>
#include <condition_variable>
#include <memory>
+#include <mutex>
#include <thread>
+#include <unordered_map>
+#include <vector>
#include "common/status.h"
#include "runtime/memory/mem_tracker.h"
@@ -46,25 +49,41 @@ public:
Status fork(const PythonVersion& version, ProcessPtr* process);
- Status get_process(const PythonVersion& version, ProcessPtr* process);
-
// Clear Python module cache for a specific UDF location across all
processes
Status clear_module_cache(const std::string& location);
- Status ensure_pool_initialized(const PythonVersion& version);
-
void shutdown();
#ifdef BE_TEST
// For unit testing only.
void check_and_recreate_processes_for_test() {
_check_and_recreate_processes(); }
- std::unordered_map<PythonVersion, std::vector<ProcessPtr>>&
process_pools_for_test() {
- return _process_pools;
- }
+ void set_process_pool_for_test(const PythonVersion& version,
std::vector<ProcessPtr> processes,
+ bool initialized = true);
+
+ std::vector<ProcessPtr>& process_pool_for_test(const PythonVersion&
version);
#endif
private:
+ struct VersionedProcessPool {
+ std::mutex mutex;
+ std::vector<ProcessPtr> processes;
+ bool initialized = false;
+ };
+
+ /**
+ * Lazily initialize and return the process pool for specific Python
version.
+ */
+ Result<std::shared_ptr<VersionedProcessPool>> _ensure_pool_initialized(
+ const PythonVersion& version);
+
+ /**
+ * Pick an available process from specific pool, recreating one on demand
if needed.
+ */
+ Status _get_process(const PythonVersion& version,
+ const std::shared_ptr<VersionedProcessPool>&
versioned_pool,
+ ProcessPtr* process);
+
/**
* Start health check background thread (called once by
ensure_pool_initialized)
* Thread periodically checks process health and refreshes memory stats
@@ -86,11 +105,14 @@ private:
*/
void _refresh_memory_stats();
- std::unordered_map<PythonVersion, std::vector<ProcessPtr>> _process_pools;
- // Protects _process_pools access
+ std::shared_ptr<VersionedProcessPool> _get_or_create_process_pool(const
PythonVersion& version);
+ std::vector<std::pair<PythonVersion,
std::shared_ptr<VersionedProcessPool>>>
+ _snapshot_process_pools();
+
+ std::unordered_map<PythonVersion, std::shared_ptr<VersionedProcessPool>>
_process_pools;
+ // Protects the version -> pool handle map only. Per-version process
operations are guarded
+ // by VersionedProcessPool::mutex.
std::mutex _pools_mutex;
- // Track which versions have been initialized
- std::unordered_set<PythonVersion> _initialized_versions;
// Health check background thread
std::unique_ptr<std::thread> _health_check_thread;
std::atomic<bool> _shutdown_flag {false};
@@ -99,4 +121,4 @@ private:
MemTracker _mem_tracker {"PythonUDFProcesses"};
};
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/test/udf/python/python_server_test.cpp
b/be/test/udf/python/python_server_test.cpp
index 40e4ab3a11a..557815f7506 100644
--- a/be/test/udf/python/python_server_test.cpp
+++ b/be/test/udf/python/python_server_test.cpp
@@ -23,6 +23,7 @@
#include <filesystem>
#include <fstream>
+#include <future>
#include <string>
#include "common/config.h"
@@ -99,6 +100,32 @@ protected:
return python_path;
}
+ std::string create_fake_python_with_delay_and_socket_creation(const
std::string& binary_name,
+ const
std::string& version,
+ int
delay_ms) {
+ std::string bin_dir = test_dir_ + "/bin";
+ std::string python_path = bin_dir + "/" + binary_name;
+ fs::create_directories(bin_dir);
+
+ std::ofstream ofs(python_path);
+ ofs << "#!/bin/bash\n";
+ ofs << "if [ \"$1\" = \"--version\" ]; then\n";
+ ofs << " echo 'Python " << version << "'\n";
+ ofs << " exit 0\n";
+ ofs << "fi\n";
+ ofs << "sleep " << (delay_ms / 1000.0) << "\n";
+ ofs << "SOCKET_PREFIX=\"$3\"\n";
+ ofs << "SOCKET_BASE=\"${SOCKET_PREFIX#grpc+unix://}\"\n";
+ ofs << "SOCKET_FILE=\"${SOCKET_BASE}_$$.sock\"\n";
+ ofs << "touch \"$SOCKET_FILE\"\n";
+ ofs << "trap 'rm -f \"$SOCKET_FILE\"; exit 0' TERM INT\n";
+ ofs << "while true; do sleep 1; done\n";
+ ofs.close();
+ fs::permissions(python_path, fs::perms::owner_all);
+
+ return python_path;
+ }
+
// Set DORIS_HOME and create flight server script directory
void setup_doris_home() {
setenv("DORIS_HOME", test_dir_.c_str(), 1);
@@ -124,18 +151,21 @@ TEST_F(PythonServerTest, SingletonReturnsSameInstance) {
}
// ============================================================================
-// PythonServerManager::get_process() - process retrieval test
+// PythonServerManager::_get_process() - process retrieval test
// ============================================================================
TEST_F(PythonServerTest, GetProcessFromEmptyPoolReturnsError) {
PythonServerManager mgr;
PythonVersion version("3.9.16", "/fake/path", "/fake/python");
- ProcessPtr process;
+ mgr.set_process_pool_for_test(version, {});
+ auto pool_result = mgr._ensure_pool_initialized(version);
+ ASSERT_TRUE(pool_result.has_value()) << pool_result.error().to_string();
- Status status = mgr.get_process(version, &process);
+ ProcessPtr process;
+ Status status = mgr._get_process(version, pool_result.value(), &process);
- // Verify: empty pool should return an error with message containing "pool
is empty"
+ // Verify: empty pool should return an error before touching process slots.
EXPECT_FALSE(status.ok());
EXPECT_TRUE(status.to_string().find("pool is empty") != std::string::npos);
EXPECT_EQ(process, nullptr);
@@ -223,7 +253,7 @@ TEST_F(PythonServerTest,
ForkWithProcessThatExitsImmediatelyReturnsError) {
}
// ============================================================================
-// PythonServerManager::ensure_pool_initialized() - pool initialization test
+// PythonServerManager::_ensure_pool_initialized() - pool initialization test
// ============================================================================
TEST_F(PythonServerTest, EnsurePoolInitializedWithInvalidVersionFails) {
@@ -231,13 +261,13 @@ TEST_F(PythonServerTest,
EnsurePoolInitializedWithInvalidVersionFails) {
PythonVersion invalid_version("3.99.99", "/non/existent/path",
"/non/existent/python");
- Status status = mgr.ensure_pool_initialized(invalid_version);
+ auto result = mgr._ensure_pool_initialized(invalid_version);
// Verify: invalid version should cause initialization to fail
- EXPECT_FALSE(status.ok());
+ EXPECT_FALSE(result.has_value());
// Error message should indicate all process creations failed
- EXPECT_TRUE(status.to_string().find("Failed") != std::string::npos ||
- status.to_string().find("failed") != std::string::npos);
+ EXPECT_TRUE(result.error().to_string().find("Failed") != std::string::npos
||
+ result.error().to_string().find("failed") !=
std::string::npos);
}
// ============================================================================
@@ -267,8 +297,8 @@ TEST_F(PythonServerTest,
ShutdownAfterFailedInitializationDoesNotCrash) {
// Try initialization first (expected to fail)
PythonVersion invalid_version("3.99.99", "/bad/path", "/bad/python");
- Status status = mgr.ensure_pool_initialized(invalid_version);
- EXPECT_FALSE(status.ok());
+ auto result = mgr._ensure_pool_initialized(invalid_version);
+ EXPECT_FALSE(result.has_value());
// Verify: calling shutdown after failed initialization does not crash
EXPECT_NO_THROW(mgr.shutdown());
@@ -364,10 +394,10 @@ TEST_F(PythonServerTest, EnsurePoolInitializedSuccess) {
PythonServerManager mgr;
PythonVersion version("3.9.16", test_dir_, python_path);
- Status status = mgr.ensure_pool_initialized(version);
+ auto result = mgr._ensure_pool_initialized(version);
// Verify pool initialization succeeded
- EXPECT_TRUE(status.ok()) << status.to_string();
+ EXPECT_TRUE(result.has_value()) << result.error().to_string();
// Cleanup
mgr.shutdown();
@@ -383,12 +413,12 @@ TEST_F(PythonServerTest, EnsurePoolInitializedIdempotent)
{
PythonVersion version("3.9.16", test_dir_, python_path);
// First initialization
- Status status1 = mgr.ensure_pool_initialized(version);
- EXPECT_TRUE(status1.ok()) << status1.to_string();
+ auto result1 = mgr._ensure_pool_initialized(version);
+ EXPECT_TRUE(result1.has_value()) << result1.error().to_string();
// Second initialization should return immediately (version already
initialized)
- Status status2 = mgr.ensure_pool_initialized(version);
- EXPECT_TRUE(status2.ok()) << status2.to_string();
+ auto result2 = mgr._ensure_pool_initialized(version);
+ EXPECT_TRUE(result2.has_value()) << result2.error().to_string();
mgr.shutdown();
}
@@ -403,12 +433,12 @@ TEST_F(PythonServerTest, GetProcessFromInitializedPool) {
PythonVersion version("3.9.16", test_dir_, python_path);
// Initialize the pool first
- Status init_status = mgr.ensure_pool_initialized(version);
- EXPECT_TRUE(init_status.ok()) << init_status.to_string();
+ auto init_result = mgr._ensure_pool_initialized(version);
+ EXPECT_TRUE(init_result.has_value()) << init_result.error().to_string();
// Get a process
ProcessPtr process;
- Status status = mgr.get_process(version, &process);
+ Status status = mgr._get_process(version, init_result.value(), &process);
EXPECT_TRUE(status.ok()) << status.to_string();
EXPECT_NE(process, nullptr);
@@ -417,6 +447,72 @@ TEST_F(PythonServerTest, GetProcessFromInitializedPool) {
mgr.shutdown();
}
+TEST_F(PythonServerTest, GetProcessRecreatesDeadProcessWhenNoAliveProcess) {
+ setup_doris_home();
+ std::string python_path =
create_fake_python_with_socket_creation("3.9.16");
+
+ config::max_python_process_num = 1;
+
+ PythonServerManager mgr;
+ PythonVersion version("3.9.16", test_dir_, python_path);
+
+ auto pool_result = mgr._ensure_pool_initialized(version);
+ ASSERT_TRUE(pool_result.has_value()) << pool_result.error().to_string();
+
+ ProcessPtr first_process;
+ ASSERT_TRUE(mgr._get_process(version, pool_result.value(),
&first_process).ok());
+ ASSERT_NE(first_process, nullptr);
+ ASSERT_TRUE(first_process->is_alive());
+ pid_t first_pid = first_process->get_child_pid();
+
+ first_process->shutdown();
+ ASSERT_FALSE(first_process->is_alive());
+
+ ProcessPtr replacement;
+ Status status = mgr._get_process(version, pool_result.value(),
&replacement);
+
+ EXPECT_TRUE(status.ok()) << status.to_string();
+ ASSERT_NE(replacement, nullptr);
+ EXPECT_TRUE(replacement->is_alive());
+ EXPECT_NE(replacement->get_child_pid(), first_pid);
+
+ mgr.shutdown();
+}
+
+TEST_F(PythonServerTest, GetProcessSkipsDeadProcessWhenAliveProcessExists) {
+ setup_doris_home();
+ std::string python_path =
create_fake_python_with_socket_creation("3.9.16");
+
+ PythonServerManager mgr;
+ PythonVersion version("3.9.16", test_dir_, python_path);
+
+ ProcessPtr alive_process;
+ ASSERT_TRUE(mgr.fork(version, &alive_process).ok());
+ ASSERT_NE(alive_process, nullptr);
+ ASSERT_TRUE(alive_process->is_alive());
+
+ ProcessPtr dead_process;
+ ASSERT_TRUE(mgr.fork(version, &dead_process).ok());
+ ASSERT_NE(dead_process, nullptr);
+ pid_t dead_pid = dead_process->get_child_pid();
+ dead_process->shutdown();
+ ASSERT_FALSE(dead_process->is_alive());
+
+ mgr.set_process_pool_for_test(version, {alive_process, dead_process});
+ auto pool_result = mgr._ensure_pool_initialized(version);
+ ASSERT_TRUE(pool_result.has_value()) << pool_result.error().to_string();
+
+ ProcessPtr selected;
+ Status status = mgr._get_process(version, pool_result.value(), &selected);
+
+ EXPECT_TRUE(status.ok()) << status.to_string();
+ EXPECT_EQ(selected, alive_process);
+ EXPECT_FALSE(mgr.process_pool_for_test(version)[1]->is_alive());
+ EXPECT_EQ(mgr.process_pool_for_test(version)[1]->get_child_pid(),
dead_pid);
+
+ mgr.shutdown();
+}
+
TEST_F(PythonServerTest, GetProcessLoadBalancing) {
setup_doris_home();
std::string python_path =
create_fake_python_with_socket_creation("3.9.16");
@@ -427,15 +523,15 @@ TEST_F(PythonServerTest, GetProcessLoadBalancing) {
PythonServerManager mgr;
PythonVersion version("3.9.16", test_dir_, python_path);
- Status init_status = mgr.ensure_pool_initialized(version);
- EXPECT_TRUE(init_status.ok()) << init_status.to_string();
+ auto init_result = mgr._ensure_pool_initialized(version);
+ EXPECT_TRUE(init_result.has_value()) << init_result.error().to_string();
// Get multiple processes to verify load balancing
ProcessPtr p1, p2, p3, p4;
- EXPECT_TRUE(mgr.get_process(version, &p1).ok());
- EXPECT_TRUE(mgr.get_process(version, &p2).ok());
- EXPECT_TRUE(mgr.get_process(version, &p3).ok());
- EXPECT_TRUE(mgr.get_process(version, &p4).ok());
+ EXPECT_TRUE(mgr._get_process(version, init_result.value(), &p1).ok());
+ EXPECT_TRUE(mgr._get_process(version, init_result.value(), &p2).ok());
+ EXPECT_TRUE(mgr._get_process(version, init_result.value(), &p3).ok());
+ EXPECT_TRUE(mgr._get_process(version, init_result.value(), &p4).ok());
// With 2 processes, load balancing distributes requests across different
processes
// p1 and p2 may be same or different processes
@@ -455,12 +551,12 @@ TEST_F(PythonServerTest, ShutdownWithRunningProcesses) {
PythonVersion version("3.9.16", test_dir_, python_path);
// Initialize the pool
- Status init_status = mgr.ensure_pool_initialized(version);
- EXPECT_TRUE(init_status.ok()) << init_status.to_string();
+ auto init_result = mgr._ensure_pool_initialized(version);
+ EXPECT_TRUE(init_result.has_value()) << init_result.error().to_string();
// Get a process reference
ProcessPtr process;
- EXPECT_TRUE(mgr.get_process(version, &process).ok());
+ EXPECT_TRUE(mgr._get_process(version, init_result.value(), &process).ok());
EXPECT_TRUE(process->is_alive());
// Shutdown should terminate all processes
@@ -509,13 +605,15 @@ TEST_F(PythonServerTest, MultipleVersionPools) {
PythonVersion version310("3.10.0", test_dir_, python310_path);
// Initialize pools for two versions
- EXPECT_TRUE(mgr.ensure_pool_initialized(version39).ok());
- EXPECT_TRUE(mgr.ensure_pool_initialized(version310).ok());
+ auto pool39_result = mgr._ensure_pool_initialized(version39);
+ auto pool310_result = mgr._ensure_pool_initialized(version310);
+ EXPECT_TRUE(pool39_result.has_value()) <<
pool39_result.error().to_string();
+ EXPECT_TRUE(pool310_result.has_value()) <<
pool310_result.error().to_string();
// Retrieve processes from both pools
ProcessPtr p39, p310;
- EXPECT_TRUE(mgr.get_process(version39, &p39).ok());
- EXPECT_TRUE(mgr.get_process(version310, &p310).ok());
+ EXPECT_TRUE(mgr._get_process(version39, pool39_result.value(), &p39).ok());
+ EXPECT_TRUE(mgr._get_process(version310, pool310_result.value(),
&p310).ok());
// Verify they are different processes
EXPECT_NE(p39->get_child_pid(), p310->get_child_pid());
@@ -523,6 +621,40 @@ TEST_F(PythonServerTest, MultipleVersionPools) {
mgr.shutdown();
}
+TEST_F(PythonServerTest,
EnsurePoolInitializedForDifferentVersionsDoesNotShareVersionLock) {
+ setup_doris_home();
+
+ config::max_python_process_num = 1;
+
+ std::string python39_path =
+ create_fake_python_with_delay_and_socket_creation("python3.9",
"3.9.16", 1200);
+ std::string python310_path =
+ create_fake_python_with_delay_and_socket_creation("python3.10",
"3.10.0", 1200);
+
+ PythonServerManager mgr;
+ PythonVersion version39("3.9.16", test_dir_, python39_path);
+ PythonVersion version310("3.10.0", test_dir_, python310_path);
+
+ auto start = std::chrono::steady_clock::now();
+ auto future39 = std::async(std::launch::async,
+ [&]() { return
mgr._ensure_pool_initialized(version39); });
+ auto future310 = std::async(std::launch::async,
+ [&]() { return
mgr._ensure_pool_initialized(version310); });
+
+ auto result39 = future39.get();
+ auto result310 = future310.get();
+ auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now() - start);
+
+ EXPECT_TRUE(result39.has_value()) << result39.error().to_string();
+ EXPECT_TRUE(result310.has_value()) << result310.error().to_string();
+ // If both versions still contended on one manager-wide lock, the elapsed
time would
+ // be close to two serialized 1.2s startups instead of a single startup
window.
+ EXPECT_LT(elapsed.count(), 2200);
+
+ mgr.shutdown();
+}
+
// ============================================================================
// PythonServerManager::_check_and_recreate_processes() - health-check
recreation test
// ============================================================================
@@ -546,15 +678,15 @@ TEST_F(PythonServerTest,
CheckAndRecreateProcessesRecreatesDeadProcess) {
dead_process->shutdown();
ASSERT_FALSE(dead_process->is_alive());
- mgr.process_pools_for_test()[version] = {alive_process, dead_process,
nullptr};
+ mgr.set_process_pool_for_test(version, {alive_process, dead_process,
nullptr});
mgr.check_and_recreate_processes_for_test();
- ASSERT_EQ(mgr.process_pools_for_test()[version].size(), 3);
- EXPECT_EQ(mgr.process_pools_for_test()[version][0], alive_process);
- EXPECT_EQ(mgr.process_pools_for_test()[version][2], nullptr);
+ ASSERT_EQ(mgr.process_pool_for_test(version).size(), 3);
+ EXPECT_EQ(mgr.process_pool_for_test(version)[0], alive_process);
+ EXPECT_EQ(mgr.process_pool_for_test(version)[2], nullptr);
- ProcessPtr recreated = mgr.process_pools_for_test()[version][1];
+ ProcessPtr recreated = mgr.process_pool_for_test(version)[1];
ASSERT_NE(recreated, nullptr);
EXPECT_TRUE(recreated->is_alive());
EXPECT_NE(recreated->get_child_pid(), dead_pid_before);
@@ -582,11 +714,11 @@ TEST_F(PythonServerTest,
CheckAndRecreateProcessesErasesDeadProcessWhenRecreateF
ASSERT_FALSE(dead_process_2->is_alive());
PythonVersion invalid_version("3.9.16", test_dir_, test_dir_ +
"/bin/nonexistent_python");
- mgr.process_pools_for_test()[invalid_version] = {dead_process_1,
dead_process_2};
+ mgr.set_process_pool_for_test(invalid_version, {dead_process_1,
dead_process_2});
mgr.check_and_recreate_processes_for_test();
- EXPECT_TRUE(mgr.process_pools_for_test()[invalid_version].empty());
+ EXPECT_TRUE(mgr.process_pool_for_test(invalid_version).empty());
mgr.shutdown();
}
diff --git a/regression-test/data/pythonudaf_p0/test_pythonudaf_drop.out
b/regression-test/data/pythonudaf_p0/test_pythonudaf_drop.out
index 79e35e30ee5..8c1eb081162 100644
--- a/regression-test/data/pythonudaf_p0/test_pythonudaf_drop.out
+++ b/regression-test/data/pythonudaf_p0/test_pythonudaf_drop.out
@@ -8,3 +8,9 @@
-- !py_udaf_drop_3 --
6
+-- !py_udaf_drop_4 --
+6
+
+-- !py_udaf_drop_5 --
+6
+
diff --git a/regression-test/data/pythonudf_p0/test_pythonudf_drop.out
b/regression-test/data/pythonudf_p0/test_pythonudf_drop.out
index 254ebe44809..84fa346f641 100644
--- a/regression-test/data/pythonudf_p0/test_pythonudf_drop.out
+++ b/regression-test/data/pythonudf_p0/test_pythonudf_drop.out
@@ -8,3 +8,9 @@
-- !py_udf_drop_3 --
8
+-- !py_udf_drop_4 --
+32
+
+-- !py_udf_drop_5 --
+33
+
diff --git a/regression-test/data/pythonudtf_p0/test_pythonudtf_drop.out
b/regression-test/data/pythonudtf_p0/test_pythonudtf_drop.out
index 6f1159a95d1..c86ee5bdc5c 100644
--- a/regression-test/data/pythonudtf_p0/test_pythonudtf_drop.out
+++ b/regression-test/data/pythonudtf_p0/test_pythonudtf_drop.out
@@ -11,3 +11,11 @@
1
2
+-- !py_udtf_drop_4 --
+1
+2
+
+-- !py_udtf_drop_5 --
+1
+2
+
diff --git a/regression-test/suites/pythonudaf_p0/test_pythonudaf_drop.groovy
b/regression-test/suites/pythonudaf_p0/test_pythonudaf_drop.groovy
index 964413828ab..4b64921676f 100644
--- a/regression-test/suites/pythonudaf_p0/test_pythonudaf_drop.groovy
+++ b/regression-test/suites/pythonudaf_p0/test_pythonudaf_drop.groovy
@@ -15,10 +15,23 @@
// specific language governing permissions and limitations
// under the License.
-suite('test_pythonudaf_drop') {
+suite('test_pythonudaf_drop', "nonConcurrent") {
def runtime_version = getPythonUdfRuntimeVersion()
def zipA =
"""${context.file.parent}/udaf_scripts/python_udaf_drop_a/python_udaf_drop_test.zip"""
def zipB =
"""${context.file.parent}/udaf_scripts/python_udaf_drop_b/python_udaf_drop_test.zip"""
+ def localDorisHome = System.getenv("DORIS_HOME")
+ def localUdfRoot = localDorisHome != null ? "${localDorisHome}/lib/udf" :
"/tmp"
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort)
+
+ def execOnBackend = { be_ip, localCmd, remoteCmd ->
+ if (be_ip == "127.0.0.1" || be_ip == "localhost") {
+ cmd(localCmd)
+ } else {
+ sshExec("root", be_ip, remoteCmd, false)
+ }
+ }
scp_udf_file_to_all_be(zipA)
scp_udf_file_to_all_be(zipB)
@@ -88,9 +101,33 @@ suite('test_pythonudaf_drop') {
sql '''SELECT py_drop_sum_a(v) FROM py_udaf_drop_tbl;'''
exception 'Can not found function'
}
+
+ // Case 3: kill Python servers between two aggregate queries, next
CREATE handshake should recover
+ sql '''DROP FUNCTION IF EXISTS py_drop_sum_reconnect(INT)'''
+ sql """
+ CREATE AGGREGATE FUNCTION py_drop_sum_reconnect(INT) RETURNS
BIGINT PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "file" = "file://${zipA}",
+ "symbol" = "drop_udaf.SumAgg",
+ "runtime_version" = "${runtime_version}"
+ )
+ """
+
+ qt_py_udaf_drop_4 '''SELECT py_drop_sum_reconnect(v) FROM
py_udaf_drop_tbl;'''
+
+ backendId_to_backendIP.values().each { be_ip ->
+ execOnBackend(
+ be_ip,
+ "pkill -f 'python_server.py grpc+unix:///tmp/doris_python_udf'
|| true",
+ "pkill -f 'python_server.py grpc+unix:///tmp/doris_python_udf'
|| true")
+ }
+
+ qt_py_udaf_drop_5 '''SELECT py_drop_sum_reconnect(v) FROM
py_udaf_drop_tbl;'''
+ try_sql('DROP FUNCTION IF EXISTS py_drop_sum_reconnect(INT);')
} finally {
try_sql('DROP FUNCTION IF EXISTS py_drop_sum_once(INT);')
try_sql('DROP FUNCTION IF EXISTS py_drop_sum_a(INT);')
try_sql('DROP FUNCTION IF EXISTS py_drop_sum_b(INT);')
+ try_sql('DROP FUNCTION IF EXISTS py_drop_sum_reconnect(INT);')
}
}
diff --git a/regression-test/suites/pythonudf_p0/test_pythonudf_drop.groovy
b/regression-test/suites/pythonudf_p0/test_pythonudf_drop.groovy
index ab103c21f25..3c2c5f9258b 100644
--- a/regression-test/suites/pythonudf_p0/test_pythonudf_drop.groovy
+++ b/regression-test/suites/pythonudf_p0/test_pythonudf_drop.groovy
@@ -15,10 +15,23 @@
// specific language governing permissions and limitations
// under the License.
-suite("test_pythonudf_drop") {
+suite("test_pythonudf_drop", "nonConcurrent") {
def runtime_version = getPythonUdfRuntimeVersion()
def zipA =
"""${context.file.parent}/udf_scripts/python_udf_drop_a/python_udf_drop_test.zip"""
def zipB =
"""${context.file.parent}/udf_scripts/python_udf_drop_b/python_udf_drop_test.zip"""
+ def localDorisHome = System.getenv("DORIS_HOME")
+ def localUdfRoot = localDorisHome != null ? "${localDorisHome}/lib/udf" :
"/tmp"
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort)
+
+ def execOnBackend = { be_ip, localCmd, remoteCmd ->
+ if (be_ip == "127.0.0.1" || be_ip == "localhost") {
+ cmd(localCmd)
+ } else {
+ sshExec("root", be_ip, remoteCmd, false)
+ }
+ }
scp_udf_file_to_all_be(zipA)
scp_udf_file_to_all_be(zipB)
@@ -88,9 +101,33 @@ suite("test_pythonudf_drop") {
sql """SELECT py_drop_a(1);"""
exception "Can not found function"
}
+
+ // Case 3: kill Python servers between two queries, next client
handshake should recover
+ sql """DROP FUNCTION IF EXISTS py_drop_reconnect(INT)"""
+ sql """
+ CREATE FUNCTION py_drop_reconnect(INT) RETURNS INT PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "file" = "file://${zipA}",
+ "symbol" = "drop_udf.evaluate",
+ "runtime_version" = "${runtime_version}"
+ )
+ """
+
+ qt_py_udf_drop_4 """SELECT py_drop_reconnect(31);"""
+
+ backendId_to_backendIP.values().each { be_ip ->
+ execOnBackend(
+ be_ip,
+ "pkill -f 'python_server.py grpc+unix:///tmp/doris_python_udf'
|| true",
+ "pkill -f 'python_server.py grpc+unix:///tmp/doris_python_udf'
|| true")
+ }
+
+ qt_py_udf_drop_5 """SELECT py_drop_reconnect(32);"""
+ try_sql("DROP FUNCTION IF EXISTS py_drop_reconnect(INT);")
} finally {
try_sql("DROP FUNCTION IF EXISTS py_drop_once(INT);")
try_sql("DROP FUNCTION IF EXISTS py_drop_a(INT);")
try_sql("DROP FUNCTION IF EXISTS py_drop_b(INT);")
+ try_sql("DROP FUNCTION IF EXISTS py_drop_reconnect(INT);")
}
}
diff --git a/regression-test/suites/pythonudtf_p0/test_pythonudtf_drop.groovy
b/regression-test/suites/pythonudtf_p0/test_pythonudtf_drop.groovy
index 1f454243fb0..04abde6c146 100644
--- a/regression-test/suites/pythonudtf_p0/test_pythonudtf_drop.groovy
+++ b/regression-test/suites/pythonudtf_p0/test_pythonudtf_drop.groovy
@@ -15,10 +15,23 @@
// specific language governing permissions and limitations
// under the License.
-suite("test_pythonudtf_drop") {
+suite("test_pythonudtf_drop", "nonConcurrent") {
def runtime_version = getPythonUdfRuntimeVersion()
def zipA =
"""${context.file.parent}/udtf_scripts/python_udtf_drop_a/python_udtf_drop_test.zip"""
def zipB =
"""${context.file.parent}/udtf_scripts/python_udtf_drop_b/python_udtf_drop_test.zip"""
+ def localDorisHome = System.getenv("DORIS_HOME")
+ def localUdfRoot = localDorisHome != null ? "${localDorisHome}/lib/udf" :
"/tmp"
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort)
+
+ def execOnBackend = { be_ip, localCmd, remoteCmd ->
+ if (be_ip == "127.0.0.1" || be_ip == "localhost") {
+ cmd(localCmd)
+ } else {
+ sshExec("root", be_ip, remoteCmd, false)
+ }
+ }
scp_udf_file_to_all_be(zipA)
scp_udf_file_to_all_be(zipB)
@@ -122,9 +135,45 @@ suite("test_pythonudtf_drop") {
"""
exception "Can not found function"
}
+
+ // Case 4: kill Python servers between two table-function queries,
next handshake should recover
+ sql """DROP FUNCTION IF EXISTS py_drop_t_reconnect(INT)"""
+ sql """
+ CREATE TABLES FUNCTION py_drop_t_reconnect(INT)
+ RETURNS ARRAY<INT>
+ PROPERTIES (
+ "type" = "PYTHON_UDF",
+ "file" = "file://${zipA}",
+ "symbol" = "drop_udtf.process",
+ "runtime_version" = "${runtime_version}"
+ )
+ """
+
+ qt_py_udtf_drop_4 """
+ SELECT c
+ FROM py_udtf_drop_tbl
+ LATERAL VIEW py_drop_t_reconnect(v) tmp AS c
+ ORDER BY c;
+ """
+
+ backendId_to_backendIP.values().each { be_ip ->
+ execOnBackend(
+ be_ip,
+ "pkill -f 'python_server.py grpc+unix:///tmp/doris_python_udf'
|| true",
+ "pkill -f 'python_server.py grpc+unix:///tmp/doris_python_udf'
|| true")
+ }
+
+ qt_py_udtf_drop_5 """
+ SELECT c
+ FROM py_udtf_drop_tbl
+ LATERAL VIEW py_drop_t_reconnect(v) tmp AS c
+ ORDER BY c;
+ """
+ try_sql("DROP FUNCTION IF EXISTS py_drop_t_reconnect(INT);")
} finally {
try_sql("DROP FUNCTION IF EXISTS py_drop_t_once(INT);")
try_sql("DROP FUNCTION IF EXISTS py_drop_t_a(INT);")
try_sql("DROP FUNCTION IF EXISTS py_drop_t_b(INT);")
+ try_sql("DROP FUNCTION IF EXISTS py_drop_t_reconnect(INT);")
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]