This is an automated email from the ASF dual-hosted git repository.
Gabriel39 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 023b766f5ce [fix](be) Validate task executor scan handles (#65054)
023b766f5ce is described below
commit 023b766f5ce44199388da5fbca37614ff38cda74
Author: Gabriel <[email protected]>
AuthorDate: Wed Jul 1 14:17:23 2026 +0800
[fix](be) Validate task executor scan handles (#65054)
Task-executor scan scheduling could pass a null or
invalid task handle into TimeSharingTaskExecutor. enqueue_splits and
related paths cast the base TaskHandle to TimeSharingTaskHandle and
immediately dereferenced the result, so a broken ScannerContext
task-handle invariant caused BE to crash with SIGSEGV instead of
returning a diagnostic error. This change validates scanner context,
scan task, and task handle before submitting scan splits, and validates
the task handle type at TimeSharingTaskExecutor entry points before
dereferencing it.
---
be/src/exec/scan/scanner_scheduler.h | 22 +++++++--
.../time_sharing/time_sharing_task_executor.cpp | 28 +++++++++--
.../time_sharing_task_executor_test.cpp | 57 ++++++++++++++++++++++
3 files changed, 100 insertions(+), 7 deletions(-)
diff --git a/be/src/exec/scan/scanner_scheduler.h
b/be/src/exec/scan/scanner_scheduler.h
index ad7ad09e239..bc4ac2c8075 100644
--- a/be/src/exec/scan/scanner_scheduler.h
+++ b/be/src/exec/scan/scanner_scheduler.h
@@ -292,13 +292,28 @@ public:
Status submit_scan_task(SimplifiedScanTask scan_task) override {
if (!_is_stop) {
+ if (scan_task.scanner_context == nullptr) {
+ return Status::InternalError<false>("scanner pool {} got null
scanner context.",
+ _sched_name);
+ }
+ if (scan_task.scan_task == nullptr) {
+ return Status::InternalError<false>("scanner pool {} got null
scan task.",
+ _sched_name);
+ }
+ auto task_handle = scan_task.scanner_context->task_handle();
+ if (task_handle == nullptr) {
+ return Status::InternalError<false>(
+ "scanner pool {} got null task handle, scan task first
schedule: {}, "
+ "scanner context: {}",
+ _sched_name, scan_task.scan_task->is_first_schedule,
+ scan_task.scanner_context->debug_string());
+ }
std::shared_ptr<SplitRunner> split_runner;
if (scan_task.scan_task->is_first_schedule) {
split_runner =
std::make_shared<ScannerSplitRunner>("scanner_split_runner",
scan_task.scan_func);
RETURN_IF_ERROR(split_runner->init());
- auto result = _task_executor->enqueue_splits(
- scan_task.scanner_context->task_handle(), false,
{split_runner});
+ auto result = _task_executor->enqueue_splits(task_handle,
false, {split_runner});
if (!result.has_value()) {
LOG(WARNING) << "enqueue_splits failed: " <<
result.error();
return result.error();
@@ -309,8 +324,7 @@ public:
if (split_runner == nullptr) {
return Status::OK();
}
- RETURN_IF_ERROR(_task_executor->re_enqueue_split(
- scan_task.scanner_context->task_handle(), false,
split_runner));
+ RETURN_IF_ERROR(_task_executor->re_enqueue_split(task_handle,
false, split_runner));
}
scan_task.scan_task->split_runner = split_runner;
return Status::OK();
diff --git
a/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp
b/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp
index 20217e6cc3c..dc60da2e39d 100644
--- a/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp
+++ b/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp
@@ -47,6 +47,24 @@ extern ::doris::MetricPrototype
METRIC_thread_pool_task_execution_count_total;
extern ::doris::MetricPrototype
METRIC_thread_pool_task_wait_worker_time_ns_total;
extern ::doris::MetricPrototype
METRIC_thread_pool_task_wait_worker_count_total;
+namespace {
+
+Result<std::shared_ptr<TimeSharingTaskHandle>> get_time_sharing_task_handle(
+ const std::shared_ptr<TaskHandle>& task_handle, const char* operation)
{
+ if (task_handle == nullptr) {
+ return ResultError(Status::InternalError("{} got null task handle",
operation));
+ }
+
+ auto handle =
std::dynamic_pointer_cast<TimeSharingTaskHandle>(task_handle);
+ if (handle == nullptr) {
+ return ResultError(Status::InternalError("{} got invalid task handle
type, task id: {}",
+ operation,
task_handle->task_id().to_string()));
+ }
+ return handle;
+}
+
+} // namespace
+
SplitThreadPoolToken::SplitThreadPoolToken(TimeSharingTaskExecutor* pool,
TimeSharingTaskExecutor::ExecutionMode mode,
std::shared_ptr<SplitQueue>
split_queue,
@@ -743,7 +761,7 @@ Status TimeSharingTaskExecutor::add_task(const TaskId&
task_id,
}
Status TimeSharingTaskExecutor::remove_task(std::shared_ptr<TaskHandle>
task_handle) {
- auto handle =
std::dynamic_pointer_cast<TimeSharingTaskHandle>(task_handle);
+ auto handle = DORIS_TRY(get_time_sharing_task_handle(task_handle,
"remove_task"));
std::vector<std::shared_ptr<PrioritizedSplitRunner>> splits_to_destroy;
{
@@ -806,7 +824,11 @@ Result<std::vector<SharedListenableFuture<Void>>>
TimeSharingTaskExecutor::enque
}
}};
std::vector<SharedListenableFuture<Void>> finished_futures;
- auto handle =
std::dynamic_pointer_cast<TimeSharingTaskHandle>(task_handle);
+ auto handle_result = get_time_sharing_task_handle(task_handle,
"enqueue_splits");
+ if (!handle_result.has_value()) {
+ return ResultError(handle_result.error());
+ }
+ auto handle = handle_result.value();
{
std::unique_lock<std::mutex> lock(_mutex);
for (const auto& task_split : splits) {
@@ -839,7 +861,7 @@ Result<std::vector<SharedListenableFuture<Void>>>
TimeSharingTaskExecutor::enque
Status TimeSharingTaskExecutor::re_enqueue_split(std::shared_ptr<TaskHandle>
task_handle,
bool intermediate,
const
std::shared_ptr<SplitRunner>& split) {
- auto handle =
std::dynamic_pointer_cast<TimeSharingTaskHandle>(task_handle);
+ auto handle = DORIS_TRY(get_time_sharing_task_handle(task_handle,
"re_enqueue_split"));
std::shared_ptr<PrioritizedSplitRunner> prioritized_split =
handle->get_split(split, intermediate);
prioritized_split->reset_level_priority();
diff --git
a/be/test/exec/executor/time_sharing/time_sharing_task_executor_test.cpp
b/be/test/exec/executor/time_sharing/time_sharing_task_executor_test.cpp
index 1636b04c425..792c01b3b23 100644
--- a/be/test/exec/executor/time_sharing/time_sharing_task_executor_test.cpp
+++ b/be/test/exec/executor/time_sharing/time_sharing_task_executor_test.cpp
@@ -26,6 +26,7 @@
#include <future>
#include <mutex>
#include <random>
+#include <string>
#include <thread>
#include "common/exception.h"
@@ -339,6 +340,20 @@ private:
std::atomic<bool> _finished {false};
};
+class TestingTaskHandle final : public TaskHandle {
+public:
+ explicit TestingTaskHandle(std::string task_id) :
_task_id(std::move(task_id)) {}
+
+ Status init() override { return Status::OK(); }
+
+ bool is_closed() const override { return false; }
+
+ TaskId task_id() const override { return _task_id; }
+
+private:
+ TaskId _task_id;
+};
+
class TimeSharingTaskExecutorTest : public testing::Test {
protected:
void SetUp() override {}
@@ -424,6 +439,48 @@ TEST_F(TimeSharingTaskExecutorTest,
test_remove_task_clears_queued_task_count) {
executor.stop();
}
+TEST_F(TimeSharingTaskExecutorTest, test_invalid_task_handle_returns_error) {
+ auto ticker = std::make_shared<TestingTicker>();
+
+ TimeSharingTaskExecutor::ThreadConfig thread_config;
+ thread_config.thread_name = "invalid_task_handle";
+ thread_config.workload_group = "normal";
+ TimeSharingTaskExecutor executor(thread_config, 0, 1, 1, ticker);
+ ASSERT_TRUE(executor.init().ok());
+
+ auto split = std::make_shared<QueueOnlySplitRunner>();
+
+ auto null_enqueue_result = executor.enqueue_splits(nullptr, false,
{split});
+ ASSERT_FALSE(null_enqueue_result.has_value());
+ EXPECT_NE(std::string(null_enqueue_result.error().msg()).find("null task
handle"),
+ std::string::npos);
+
+ Status null_re_enqueue_status = executor.re_enqueue_split(nullptr, false,
split);
+ ASSERT_FALSE(null_re_enqueue_status.ok());
+ EXPECT_NE(std::string(null_re_enqueue_status.msg()).find("null task
handle"),
+ std::string::npos);
+
+ Status null_remove_status = executor.remove_task(nullptr);
+ ASSERT_FALSE(null_remove_status.ok());
+ EXPECT_NE(std::string(null_remove_status.msg()).find("null task handle"),
std::string::npos);
+
+ auto invalid_task_handle =
std::make_shared<TestingTaskHandle>("invalid_task");
+ auto invalid_enqueue_result = executor.enqueue_splits(invalid_task_handle,
false, {split});
+ ASSERT_FALSE(invalid_enqueue_result.has_value());
+ EXPECT_NE(std::string(invalid_enqueue_result.error().msg()).find("invalid
task handle type"),
+ std::string::npos);
+
+ Status invalid_re_enqueue_status =
executor.re_enqueue_split(invalid_task_handle, false, split);
+ ASSERT_FALSE(invalid_re_enqueue_status.ok());
+ EXPECT_NE(std::string(invalid_re_enqueue_status.msg()).find("invalid task
handle type"),
+ std::string::npos);
+
+ Status invalid_remove_status = executor.remove_task(invalid_task_handle);
+ ASSERT_FALSE(invalid_remove_status.ok());
+ EXPECT_NE(std::string(invalid_remove_status.msg()).find("invalid task
handle type"),
+ std::string::npos);
+}
+
TEST_F(TimeSharingTaskExecutorTest, test_tasks_complete) {
auto ticker = std::make_shared<TestingTicker>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]