This is an automated email from the ASF dual-hosted git repository.

Gabriel39 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 023b766f5ce [fix](be) Validate task executor scan handles (#65054)
023b766f5ce is described below

commit 023b766f5ce44199388da5fbca37614ff38cda74
Author: Gabriel <[email protected]>
AuthorDate: Wed Jul 1 14:17:23 2026 +0800

    [fix](be) Validate task executor scan handles (#65054)
    
    Task-executor scan scheduling could pass a null or
    invalid task handle into TimeSharingTaskExecutor. enqueue_splits and
    related paths cast the base TaskHandle to TimeSharingTaskHandle and
    immediately dereferenced the result, so a broken ScannerContext
    task-handle invariant caused BE to crash with SIGSEGV instead of
    returning a diagnostic error. This change validates scanner context,
    scan task, and task handle before submitting scan splits, and validates
    the task handle type at TimeSharingTaskExecutor entry points before
    dereferencing it.
---
 be/src/exec/scan/scanner_scheduler.h               | 22 +++++++--
 .../time_sharing/time_sharing_task_executor.cpp    | 28 +++++++++--
 .../time_sharing_task_executor_test.cpp            | 57 ++++++++++++++++++++++
 3 files changed, 100 insertions(+), 7 deletions(-)

diff --git a/be/src/exec/scan/scanner_scheduler.h 
b/be/src/exec/scan/scanner_scheduler.h
index ad7ad09e239..bc4ac2c8075 100644
--- a/be/src/exec/scan/scanner_scheduler.h
+++ b/be/src/exec/scan/scanner_scheduler.h
@@ -292,13 +292,28 @@ public:
 
     Status submit_scan_task(SimplifiedScanTask scan_task) override {
         if (!_is_stop) {
+            if (scan_task.scanner_context == nullptr) {
+                return Status::InternalError<false>("scanner pool {} got null 
scanner context.",
+                                                    _sched_name);
+            }
+            if (scan_task.scan_task == nullptr) {
+                return Status::InternalError<false>("scanner pool {} got null 
scan task.",
+                                                    _sched_name);
+            }
+            auto task_handle = scan_task.scanner_context->task_handle();
+            if (task_handle == nullptr) {
+                return Status::InternalError<false>(
+                        "scanner pool {} got null task handle, scan task first 
schedule: {}, "
+                        "scanner context: {}",
+                        _sched_name, scan_task.scan_task->is_first_schedule,
+                        scan_task.scanner_context->debug_string());
+            }
             std::shared_ptr<SplitRunner> split_runner;
             if (scan_task.scan_task->is_first_schedule) {
                 split_runner = 
std::make_shared<ScannerSplitRunner>("scanner_split_runner",
                                                                     
scan_task.scan_func);
                 RETURN_IF_ERROR(split_runner->init());
-                auto result = _task_executor->enqueue_splits(
-                        scan_task.scanner_context->task_handle(), false, 
{split_runner});
+                auto result = _task_executor->enqueue_splits(task_handle, 
false, {split_runner});
                 if (!result.has_value()) {
                     LOG(WARNING) << "enqueue_splits failed: " << 
result.error();
                     return result.error();
@@ -309,8 +324,7 @@ public:
                 if (split_runner == nullptr) {
                     return Status::OK();
                 }
-                RETURN_IF_ERROR(_task_executor->re_enqueue_split(
-                        scan_task.scanner_context->task_handle(), false, 
split_runner));
+                RETURN_IF_ERROR(_task_executor->re_enqueue_split(task_handle, 
false, split_runner));
             }
             scan_task.scan_task->split_runner = split_runner;
             return Status::OK();
diff --git 
a/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp 
b/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp
index 20217e6cc3c..dc60da2e39d 100644
--- a/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp
+++ b/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp
@@ -47,6 +47,24 @@ extern ::doris::MetricPrototype 
METRIC_thread_pool_task_execution_count_total;
 extern ::doris::MetricPrototype 
METRIC_thread_pool_task_wait_worker_time_ns_total;
 extern ::doris::MetricPrototype 
METRIC_thread_pool_task_wait_worker_count_total;
 
+namespace {
+
+Result<std::shared_ptr<TimeSharingTaskHandle>> get_time_sharing_task_handle(
+        const std::shared_ptr<TaskHandle>& task_handle, const char* operation) 
{
+    if (task_handle == nullptr) {
+        return ResultError(Status::InternalError("{} got null task handle", 
operation));
+    }
+
+    auto handle = 
std::dynamic_pointer_cast<TimeSharingTaskHandle>(task_handle);
+    if (handle == nullptr) {
+        return ResultError(Status::InternalError("{} got invalid task handle 
type, task id: {}",
+                                                 operation, 
task_handle->task_id().to_string()));
+    }
+    return handle;
+}
+
+} // namespace
+
 SplitThreadPoolToken::SplitThreadPoolToken(TimeSharingTaskExecutor* pool,
                                            
TimeSharingTaskExecutor::ExecutionMode mode,
                                            std::shared_ptr<SplitQueue> 
split_queue,
@@ -743,7 +761,7 @@ Status TimeSharingTaskExecutor::add_task(const TaskId& 
task_id,
 }
 
 Status TimeSharingTaskExecutor::remove_task(std::shared_ptr<TaskHandle> 
task_handle) {
-    auto handle = 
std::dynamic_pointer_cast<TimeSharingTaskHandle>(task_handle);
+    auto handle = DORIS_TRY(get_time_sharing_task_handle(task_handle, 
"remove_task"));
     std::vector<std::shared_ptr<PrioritizedSplitRunner>> splits_to_destroy;
 
     {
@@ -806,7 +824,11 @@ Result<std::vector<SharedListenableFuture<Void>>> 
TimeSharingTaskExecutor::enque
         }
     }};
     std::vector<SharedListenableFuture<Void>> finished_futures;
-    auto handle = 
std::dynamic_pointer_cast<TimeSharingTaskHandle>(task_handle);
+    auto handle_result = get_time_sharing_task_handle(task_handle, 
"enqueue_splits");
+    if (!handle_result.has_value()) {
+        return ResultError(handle_result.error());
+    }
+    auto handle = handle_result.value();
     {
         std::unique_lock<std::mutex> lock(_mutex);
         for (const auto& task_split : splits) {
@@ -839,7 +861,7 @@ Result<std::vector<SharedListenableFuture<Void>>> 
TimeSharingTaskExecutor::enque
 Status TimeSharingTaskExecutor::re_enqueue_split(std::shared_ptr<TaskHandle> 
task_handle,
                                                  bool intermediate,
                                                  const 
std::shared_ptr<SplitRunner>& split) {
-    auto handle = 
std::dynamic_pointer_cast<TimeSharingTaskHandle>(task_handle);
+    auto handle = DORIS_TRY(get_time_sharing_task_handle(task_handle, 
"re_enqueue_split"));
     std::shared_ptr<PrioritizedSplitRunner> prioritized_split =
             handle->get_split(split, intermediate);
     prioritized_split->reset_level_priority();
diff --git 
a/be/test/exec/executor/time_sharing/time_sharing_task_executor_test.cpp 
b/be/test/exec/executor/time_sharing/time_sharing_task_executor_test.cpp
index 1636b04c425..792c01b3b23 100644
--- a/be/test/exec/executor/time_sharing/time_sharing_task_executor_test.cpp
+++ b/be/test/exec/executor/time_sharing/time_sharing_task_executor_test.cpp
@@ -26,6 +26,7 @@
 #include <future>
 #include <mutex>
 #include <random>
+#include <string>
 #include <thread>
 
 #include "common/exception.h"
@@ -339,6 +340,20 @@ private:
     std::atomic<bool> _finished {false};
 };
 
+class TestingTaskHandle final : public TaskHandle {
+public:
+    explicit TestingTaskHandle(std::string task_id) : 
_task_id(std::move(task_id)) {}
+
+    Status init() override { return Status::OK(); }
+
+    bool is_closed() const override { return false; }
+
+    TaskId task_id() const override { return _task_id; }
+
+private:
+    TaskId _task_id;
+};
+
 class TimeSharingTaskExecutorTest : public testing::Test {
 protected:
     void SetUp() override {}
@@ -424,6 +439,48 @@ TEST_F(TimeSharingTaskExecutorTest, 
test_remove_task_clears_queued_task_count) {
     executor.stop();
 }
 
+TEST_F(TimeSharingTaskExecutorTest, test_invalid_task_handle_returns_error) {
+    auto ticker = std::make_shared<TestingTicker>();
+
+    TimeSharingTaskExecutor::ThreadConfig thread_config;
+    thread_config.thread_name = "invalid_task_handle";
+    thread_config.workload_group = "normal";
+    TimeSharingTaskExecutor executor(thread_config, 0, 1, 1, ticker);
+    ASSERT_TRUE(executor.init().ok());
+
+    auto split = std::make_shared<QueueOnlySplitRunner>();
+
+    auto null_enqueue_result = executor.enqueue_splits(nullptr, false, 
{split});
+    ASSERT_FALSE(null_enqueue_result.has_value());
+    EXPECT_NE(std::string(null_enqueue_result.error().msg()).find("null task 
handle"),
+              std::string::npos);
+
+    Status null_re_enqueue_status = executor.re_enqueue_split(nullptr, false, 
split);
+    ASSERT_FALSE(null_re_enqueue_status.ok());
+    EXPECT_NE(std::string(null_re_enqueue_status.msg()).find("null task 
handle"),
+              std::string::npos);
+
+    Status null_remove_status = executor.remove_task(nullptr);
+    ASSERT_FALSE(null_remove_status.ok());
+    EXPECT_NE(std::string(null_remove_status.msg()).find("null task handle"), 
std::string::npos);
+
+    auto invalid_task_handle = 
std::make_shared<TestingTaskHandle>("invalid_task");
+    auto invalid_enqueue_result = executor.enqueue_splits(invalid_task_handle, 
false, {split});
+    ASSERT_FALSE(invalid_enqueue_result.has_value());
+    EXPECT_NE(std::string(invalid_enqueue_result.error().msg()).find("invalid 
task handle type"),
+              std::string::npos);
+
+    Status invalid_re_enqueue_status = 
executor.re_enqueue_split(invalid_task_handle, false, split);
+    ASSERT_FALSE(invalid_re_enqueue_status.ok());
+    EXPECT_NE(std::string(invalid_re_enqueue_status.msg()).find("invalid task 
handle type"),
+              std::string::npos);
+
+    Status invalid_remove_status = executor.remove_task(invalid_task_handle);
+    ASSERT_FALSE(invalid_remove_status.ok());
+    EXPECT_NE(std::string(invalid_remove_status.msg()).find("invalid task 
handle type"),
+              std::string::npos);
+}
+
 TEST_F(TimeSharingTaskExecutorTest, test_tasks_complete) {
     auto ticker = std::make_shared<TestingTicker>();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to