This is an automated email from the ASF dual-hosted git repository.

BiteTheDDDDt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ce61c8dfe99 [Chore](be) Avoid BE crash on exception (#63312)
ce61c8dfe99 is described below

commit ce61c8dfe99a5a2648c7980b5d0396977cdf9eb0
Author: Pxl <[email protected]>
AuthorDate: Mon May 18 19:31:22 2026 +0800

    [Chore](be) Avoid BE crash on exception (#63312)
    
    This pull request improves error handling and robustness in the
    `PrioritizedSplitRunner` and related time-sharing task execution code.
    It ensures that exceptions thrown during split processing are correctly
    captured and propagated, and adds testing to verify this behavior.
    
    **Error handling improvements:**
    
    * Updated `PrioritizedSplitRunner::process()` to catch `Exception`
    thrown by `process_for`, convert it to a `Status`, set the error on
    `_finished_future`, and return the error result. This ensures exceptions
    do not go unhandled and are properly surfaced.
    * Modified `close` and `is_finished` methods in `PrioritizedSplitRunner`
    to set `_finished_future` with either a value or error status, depending
    on the outcome, providing more accurate future state management.
    
[[1]](diffhunk://#diff-eabed95efc8325f788b423182bac8563fa77c0f995683b76fb9628757b210269R64-R68)
    
[[2]](diffhunk://#diff-eabed95efc8325f788b423182bac8563fa77c0f995683b76fb9628757b210269R79-R84)
    
    **Testing enhancements:**
    
    * Added a `ThrowingSplitRunner` test class that throws an exception in
    `process_for`, and a new test `test_process_exception_returns_error` to
    verify that exceptions are properly handled and reflected in the
    runner's future.
    
[[1]](diffhunk://#diff-fa509c2ede062320aa3c657810303ee8d04b70ec095a77d3efca3ffbd4b2eb6aR295-R316)
    
[[2]](diffhunk://#diff-fa509c2ede062320aa3c657810303ee8d04b70ec095a77d3efca3ffbd4b2eb6aR446-R463)
    
    **Code quality improvements:**
    
    * Included missing `common/exception.h` header in both implementation
    and test files for proper exception handling.
    
[[1]](diffhunk://#diff-eabed95efc8325f788b423182bac8563fa77c0f995683b76fb9628757b210269R26)
    
[[2]](diffhunk://#diff-fa509c2ede062320aa3c657810303ee8d04b70ec095a77d3efca3ffbd4b2eb6aR31-R34)
    * Used `std::move` when initializing `_scan_func` in
    `ScannerSplitRunner` to ensure efficient resource management.
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 be/src/exec/scan/scanner_scheduler.h               |  2 +-
 .../time_sharing/prioritized_split_runner.cpp      | 26 +++++++++++--
 .../time_sharing_task_executor_test.cpp            | 43 ++++++++++++++++++++++
 3 files changed, 67 insertions(+), 4 deletions(-)

diff --git a/be/src/exec/scan/scanner_scheduler.h 
b/be/src/exec/scan/scanner_scheduler.h
index de1553b026e..ad7ad09e239 100644
--- a/be/src/exec/scan/scanner_scheduler.h
+++ b/be/src/exec/scan/scanner_scheduler.h
@@ -62,7 +62,7 @@ struct SimplifiedScanTask {
 class ScannerSplitRunner : public SplitRunner {
 public:
     ScannerSplitRunner(std::string name, std::function<bool()> scan_func)
-            : _name(std::move(name)), _scan_func(scan_func), _started(false) {}
+            : _name(std::move(name)), _scan_func(std::move(scan_func)), 
_started(false) {}
 
     Status init() override { return Status::OK(); }
 
diff --git 
a/be/src/exec/scan/task_executor/time_sharing/prioritized_split_runner.cpp 
b/be/src/exec/scan/task_executor/time_sharing/prioritized_split_runner.cpp
index 032ba00473b..e26e399d435 100644
--- a/be/src/exec/scan/task_executor/time_sharing/prioritized_split_runner.cpp
+++ b/be/src/exec/scan/task_executor/time_sharing/prioritized_split_runner.cpp
@@ -23,6 +23,7 @@
 #include <functional>
 #include <thread>
 
+#include "common/exception.h"
 #include "exec/scan/task_executor/time_sharing/time_sharing_task_handle.h"
 
 namespace doris {
@@ -60,6 +61,11 @@ bool PrioritizedSplitRunner::is_closed() const {
 void PrioritizedSplitRunner::close(const Status& status) {
     if (!_closed.exchange(true)) {
         _split_runner->close(status);
+        if (status.ok()) {
+            _finished_future.set_value({});
+        } else {
+            _finished_future.set_error(status);
+        }
     }
 }
 
@@ -70,7 +76,12 @@ int64_t PrioritizedSplitRunner::created_nanos() const {
 bool PrioritizedSplitRunner::is_finished() {
     bool finished = _split_runner->is_finished();
     if (finished) {
-        _finished_future.set_value({});
+        auto status = _split_runner->finished_status();
+        if (status.ok()) {
+            _finished_future.set_value({});
+        } else {
+            _finished_future.set_error(status);
+        }
     }
     return finished || _closed.load() || _task_handle->is_closed();
 }
@@ -99,9 +110,18 @@ Result<SharedListenableFuture<Void>> 
PrioritizedSplitRunner::process() {
     _wait_nanos.fetch_add(start_nanos - _last_ready.load());
 
     auto process_start_time = _ticker->read();
-    auto blocked = _split_runner->process_for(SPLIT_RUN_QUANTA);
+    Result<SharedListenableFuture<Void>> blocked = 
SharedListenableFuture<Void>::create_ready();
+    try {
+        blocked = _split_runner->process_for(SPLIT_RUN_QUANTA);
+    } catch (const Exception& e) {
+        Status status = e.to_status();
+        _finished_future.set_error(status);
+        return unexpected(status);
+    }
     if (!blocked.has_value()) {
-        return unexpected(std::move(blocked).error());
+        auto error_status = blocked.error();
+        _finished_future.set_error(error_status);
+        return unexpected(std::move(error_status));
     }
     auto process_end_time = _ticker->read();
     auto quanta_scheduled_nanos = process_end_time - process_start_time;
diff --git 
a/be/test/exec/executor/time_sharing/time_sharing_task_executor_test.cpp 
b/be/test/exec/executor/time_sharing/time_sharing_task_executor_test.cpp
index 0459121408e..b6f90daf79b 100644
--- a/be/test/exec/executor/time_sharing/time_sharing_task_executor_test.cpp
+++ b/be/test/exec/executor/time_sharing/time_sharing_task_executor_test.cpp
@@ -28,7 +28,10 @@
 #include <random>
 #include <thread>
 
+#include "common/exception.h"
 #include "exec/scan/task_executor/ticker.h"
+#include "exec/scan/task_executor/time_sharing/multilevel_split_queue.h"
+#include "exec/scan/task_executor/time_sharing/prioritized_split_runner.h"
 #include "exec/scan/task_executor/time_sharing/time_sharing_task_handle.h"
 
 namespace doris {
@@ -289,6 +292,28 @@ private:
     ListenableFuture<Void> _completion_future {};
 };
 
+class ThrowingSplitRunner : public SplitRunner {
+public:
+    explicit ThrowingSplitRunner(Status status) : _status(std::move(status)) {}
+
+    Status init() override { return Status::OK(); }
+
+    Result<SharedListenableFuture<Void>> process_for(std::chrono::nanoseconds) 
override {
+        throw Exception(_status);
+    }
+
+    void close(const Status& status) override {}
+
+    bool is_finished() override { return false; }
+
+    Status finished_status() override { return _status; }
+
+    std::string get_info() const override { return ""; }
+
+private:
+    Status _status;
+};
+
 class TimeSharingTaskExecutorTest : public testing::Test {
 protected:
     void SetUp() override {}
@@ -418,6 +443,24 @@ TEST_F(TimeSharingTaskExecutorTest, test_tasks_complete) {
     executor.stop();
 }
 
+TEST(PrioritizedSplitRunnerTest, test_process_exception_returns_error) {
+    auto ticker = std::make_shared<TestingTicker>();
+    auto task_handle = TimeSharingTaskHandle::create_shared(
+            TaskId("test_exception_task"), 
std::make_shared<MultilevelSplitQueue>(2.0),
+            []() { return 0.0; }, 1, std::chrono::milliseconds(1), 
std::nullopt);
+    ASSERT_TRUE(task_handle->init().ok());
+    auto split =
+            std::make_shared<ThrowingSplitRunner>(Status::InternalError("split 
process failed"));
+    PrioritizedSplitRunner prioritized_split(task_handle, 0, split, ticker);
+
+    auto result = prioritized_split.process();
+    ASSERT_FALSE(result.has_value());
+    EXPECT_NE(std::string::npos, result.error().to_string().find("split 
process failed"));
+    EXPECT_TRUE(prioritized_split.finished_future().is_error());
+    EXPECT_NE(std::string::npos, 
prioritized_split.finished_future().get_status().to_string().find(
+                                         "split process failed"));
+}
+
 TEST_F(TimeSharingTaskExecutorTest, test_quanta_fairness) {
     constexpr int TOTAL_PHASES = 11;
     std::shared_ptr<TestingTicker> ticker = std::make_shared<TestingTicker>();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to