This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ff1012da3e9 [fix](be) Fix time-sharing executor queued task count 
(#63568)
ff1012da3e9 is described below

commit ff1012da3e9b708814898c458613c5a31acce415
Author: Raiden <[email protected]>
AuthorDate: Tue May 26 11:59:51 2026 +0800

    [fix](be) Fix time-sharing executor queued task count (#63568)
    
    ### What problem does this PR solve?
    
    Issue Number: N/A
    
    Related PR: N/A
    
    Problem Summary:
    
    `TimeSharingTaskExecutor` uses `_total_queued_tasks` for queue-size
    metrics and capacity checks. When queued splits were removed before
    execution, for example when a task was cancelled or removed, the split
    queue removed those splits but `_total_queued_tasks` was not
    decremented.
    
    After repeated removals, `_total_queued_tasks` could become larger than
    the real queue size. This made the executor report a non-zero queue size
    even when there were no active or queued splits, and later submissions
    could be rejected as if the queue were full.
    
    This PR keeps queue offer/remove operations consistent by updating
    `_total_queued_tasks` together with the split queue and token state.
    
    ### Release note
    
    Fix a bug where the time-sharing scan executor queue size could become
    inaccurate after queued splits were removed before execution.
    
    ### Check List (For Author)
    
    - Test
        - [x] Regression test
        - [x] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason
    
    - Behavior changed:
        - [ ] No.
    - [x] Yes. Queued splits removed before execution now decrement executor
    queued-task accounting, so queue-size metrics and capacity checks
    reflect the real queued split count.
    
    - Does this need documentation?
        - [x] No.
        - [ ] Yes.
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label
---
 .../time_sharing/time_sharing_task_executor.cpp    | 44 ++++++++++--
 .../time_sharing/time_sharing_task_executor.h      |  9 +++
 .../time_sharing_task_executor_test.cpp            | 78 +++++++++++++++++++++-
 3 files changed, 123 insertions(+), 8 deletions(-)

diff --git 
a/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp 
b/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp
index 32636a2cf8f..20217e6cc3c 100644
--- a/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp
+++ b/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp
@@ -276,7 +276,7 @@ TimeSharingTaskExecutor::~TimeSharingTaskExecutor() {
         }
         {
             std::unique_lock<std::mutex> l(_lock);
-            _tokenless->_entries->remove_all(splits_to_destroy);
+            _remove_queued_splits_unlocked(splits_to_destroy);
         }
     }
 
@@ -421,7 +421,7 @@ Status 
TimeSharingTaskExecutor::_do_submit(std::shared_ptr<PrioritizedSplitRunne
     DCHECK(state == SplitThreadPoolToken::State::IDLE ||
            state == SplitThreadPoolToken::State::RUNNING);
     split->submit_time_watch().start();
-    _tokenless->_entries->offer(std::move(split));
+    _offer_split_unlocked(std::move(split));
     if (state == SplitThreadPoolToken::State::IDLE) {
         _tokenless->transition(SplitThreadPoolToken::State::RUNNING);
     }
@@ -433,8 +433,6 @@ Status 
TimeSharingTaskExecutor::_do_submit(std::shared_ptr<PrioritizedSplitRunne
     //    1. If it is a SERIAL token, and there are unsubmitted tasks, submit 
them to the queue.
     //    2. If it is a CONCURRENT token, and there are still unsubmitted 
tasks, and the upper limit of concurrency is not reached,
     //       then submitted to the queue.
-    _total_queued_tasks++;
-
     // Wake up an idle thread for this task. Choosing the thread at the front 
of
     // the list ensures LIFO semantics as idling threads are also added to the 
front.
     //
@@ -570,7 +568,8 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
                         lock.unlock();
                         l.lock();
                         if (_tokenless->state() == 
SplitThreadPoolToken::State::RUNNING) {
-                            _tokenless->_entries->offer(split);
+                            split->submit_time_watch().reset();
+                            _offer_split_unlocked(split);
                         }
                         l.unlock();
                     } else {
@@ -586,7 +585,8 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
                                 split->reset_level_priority();
                                 std::unique_lock<std::mutex> l(_lock);
                                 if (_tokenless->state() == 
SplitThreadPoolToken::State::RUNNING) {
-                                    _tokenless->_entries->offer(split);
+                                    split->submit_time_watch().reset();
+                                    _offer_split_unlocked(split);
                                 }
                             } else {
                                 LOG(WARNING) << "blocked split is failed, 
split_id: "
@@ -770,7 +770,7 @@ Status 
TimeSharingTaskExecutor::remove_task(std::shared_ptr<TaskHandle> task_han
         }
         {
             std::unique_lock<std::mutex> l(_lock);
-            _tokenless->_entries->remove_all(splits_to_destroy);
+            _remove_queued_splits_unlocked(splits_to_destroy);
         }
     }
 
@@ -846,6 +846,36 @@ Status 
TimeSharingTaskExecutor::re_enqueue_split(std::shared_ptr<TaskHandle> tas
     return _do_submit(prioritized_split);
 }
 
+void 
TimeSharingTaskExecutor::_offer_split_unlocked(std::shared_ptr<PrioritizedSplitRunner>
 split) {
+    _tokenless->_entries->offer(std::move(split));
+    ++_total_queued_tasks;
+}
+
+void TimeSharingTaskExecutor::_remove_queued_splits_unlocked(
+        const std::vector<std::shared_ptr<PrioritizedSplitRunner>>& splits) {
+    if (splits.empty()) {
+        return;
+    }
+
+    const size_t queue_size_before = _tokenless->_entries->size();
+    _tokenless->_entries->remove_all(splits);
+    const size_t queue_size_after = _tokenless->_entries->size();
+    DCHECK_GE(queue_size_before, queue_size_after);
+
+    const auto removed = static_cast<int>(queue_size_before - 
queue_size_after);
+    DCHECK_GE(_total_queued_tasks, removed);
+    _total_queued_tasks -= removed;
+
+    if (_tokenless->state() == SplitThreadPoolToken::State::RUNNING &&
+        _tokenless->_active_threads == 0 && _tokenless->_entries->size() == 0) 
{
+        _tokenless->transition(SplitThreadPoolToken::State::IDLE);
+    }
+
+    if (_total_queued_tasks == 0 && _active_threads == 0) {
+        _idle_cond.notify_all();
+    }
+}
+
 void 
TimeSharingTaskExecutor::_split_finished(std::shared_ptr<PrioritizedSplitRunner>
 split,
                                               const Status& status) {
     _completed_splits_per_level[split->priority().level()]++;
diff --git 
a/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.h 
b/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.h
index ba38ddb04da..13a42a1385c 100644
--- a/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.h
+++ b/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.h
@@ -294,6 +294,15 @@ private:
     // // Submits a task to be run via token.
     Status _do_submit(std::shared_ptr<PrioritizedSplitRunner> split);
 
+    // Offer a split to the executor queue and keep _total_queued_tasks 
consistent.
+    // REQUIRES: _lock is held.
+    void _offer_split_unlocked(std::shared_ptr<PrioritizedSplitRunner> split);
+
+    // Remove queued splits and keep _total_queued_tasks/token state 
consistent.
+    // REQUIRES: _lock is held.
+    void _remove_queued_splits_unlocked(
+            const std::vector<std::shared_ptr<PrioritizedSplitRunner>>& 
splits);
+
     //NOTE: not thread safe, caller should keep it thread-safe by using lock
     Status _try_create_thread(int thread_num, std::lock_guard<std::mutex>&);
 
diff --git 
a/be/test/exec/executor/time_sharing/time_sharing_task_executor_test.cpp 
b/be/test/exec/executor/time_sharing/time_sharing_task_executor_test.cpp
index b6f90daf79b..1636b04c425 100644
--- a/be/test/exec/executor/time_sharing/time_sharing_task_executor_test.cpp
+++ b/be/test/exec/executor/time_sharing/time_sharing_task_executor_test.cpp
@@ -314,13 +314,37 @@ private:
     Status _status;
 };
 
+class QueueOnlySplitRunner : public SplitRunner {
+public:
+    Status init() override { return Status::OK(); }
+
+    Result<SharedListenableFuture<Void>> process_for(std::chrono::nanoseconds) 
override {
+        _started = true;
+        _finished = true;
+        return SharedListenableFuture<Void>::create_ready();
+    }
+
+    void close(const Status& status) override {}
+
+    bool is_finished() override { return _finished.load(); }
+
+    Status finished_status() override { return Status::OK(); }
+
+    std::string get_info() const override { return "queue_only_split"; }
+
+    bool is_started() const { return _started.load(); }
+
+private:
+    std::atomic<bool> _started {false};
+    std::atomic<bool> _finished {false};
+};
+
 class TimeSharingTaskExecutorTest : public testing::Test {
 protected:
     void SetUp() override {}
 
     void TearDown() override {}
 
-private:
     template <typename Container>
     void assert_split_states(int end_index, const Container& splits) {
         for (int i = 0; i <= end_index; ++i) {
@@ -348,6 +372,58 @@ private:
     }
 };
 
+TEST_F(TimeSharingTaskExecutorTest, test_remove_task_clears_queued_task_count) 
{
+    auto ticker = std::make_shared<TestingTicker>();
+
+    TimeSharingTaskExecutor::ThreadConfig thread_config;
+    thread_config.thread_name = "leak_repro";
+    thread_config.workload_group = "normal";
+    thread_config.max_thread_num = 0;
+    thread_config.min_thread_num = 0;
+    thread_config.max_queue_size = 2;
+    TimeSharingTaskExecutor executor(thread_config, 0, 1, 1, ticker);
+    ASSERT_TRUE(executor.init().ok());
+    ASSERT_TRUE(executor.start().ok());
+
+    try {
+        for (int i = 0; i < thread_config.max_queue_size; ++i) {
+            auto task_handle = TEST_TRY(executor.create_task(
+                    TaskId("removed_task_" + std::to_string(i)), []() { return 
0.0; }, 1,
+                    std::chrono::milliseconds(1), std::optional<int>(1)));
+            auto split = std::make_shared<QueueOnlySplitRunner>();
+
+            auto enqueue_result = executor.enqueue_splits(task_handle, false, 
{split});
+            ASSERT_TRUE(enqueue_result.has_value()) << enqueue_result.error();
+            EXPECT_EQ(executor.waiting_splits_size(), 1);
+
+            ASSERT_TRUE(executor.remove_task(task_handle).ok());
+            EXPECT_FALSE(split->is_started());
+            EXPECT_EQ(executor.waiting_splits_size(), 0);
+            EXPECT_EQ(executor.get_queue_size(), 0);
+        }
+
+        EXPECT_EQ(executor.num_active_threads(), 0);
+        EXPECT_EQ(executor.waiting_splits_size(), 0);
+        EXPECT_EQ(executor.get_queue_size(), 0);
+
+        auto task_handle = TEST_TRY(executor.create_task(
+                TaskId("next_task"), []() { return 0.0; }, 1, 
std::chrono::milliseconds(1),
+                std::optional<int>(1)));
+        auto split = std::make_shared<QueueOnlySplitRunner>();
+
+        auto enqueue_result = executor.enqueue_splits(task_handle, false, 
{split});
+        ASSERT_TRUE(enqueue_result.has_value()) << enqueue_result.error();
+        EXPECT_EQ(executor.waiting_splits_size(), 1);
+        EXPECT_EQ(executor.get_queue_size(), 1);
+
+        static_cast<void>(executor.remove_task(task_handle));
+    } catch (...) {
+        executor.stop();
+        throw;
+    }
+    executor.stop();
+}
+
 TEST_F(TimeSharingTaskExecutorTest, test_tasks_complete) {
     auto ticker = std::make_shared<TestingTicker>();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to