This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 46d40b19525 [refactor](executor)Remove empty group logic #26005
46d40b19525 is described below

commit 46d40b195258d8abc34d56dc177d209d9873b07f
Author: wangbo <[email protected]>
AuthorDate: Fri Oct 27 14:24:41 2023 +0800

    [refactor](executor)Remove empty group logic #26005
---
 be/src/common/config.cpp                   |  1 -
 be/src/common/config.h                     |  1 -
 be/src/pipeline/pipeline_task.cpp          | 23 ++-------------
 be/src/pipeline/pipeline_task.h            | 20 -------------
 be/src/pipeline/task_queue.cpp             | 46 ++---------------------------
 be/src/pipeline/task_queue.h               |  9 ------
 be/src/pipeline/task_scheduler.cpp         |  4 ---
 be/src/runtime/task_group/task_group.cpp   | 17 +----------
 be/src/runtime/task_group/task_group.h     | 10 -------
 be/src/vec/exec/scan/scan_task_queue.cpp   | 47 ++----------------------------
 be/src/vec/exec/scan/scan_task_queue.h     |  9 ------
 be/src/vec/exec/scan/scanner_scheduler.cpp | 20 ++++---------
 be/src/vec/exec/scan/scanner_scheduler.h   |  4 ---
 13 files changed, 13 insertions(+), 198 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 58121768723..5b6e393feed 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1110,7 +1110,6 @@ DEFINE_Bool(enable_flush_file_cache_async, "true");
 
 // cgroup
 DEFINE_String(doris_cgroup_cpu_path, "");
-DEFINE_Bool(enable_cpu_hard_limit, "false");
 
 DEFINE_Bool(ignore_always_true_predicate_for_segment, "true");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index f8350d87316..71c0a1e12c4 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1177,7 +1177,6 @@ DECLARE_mBool(exit_on_exception);
 
 // cgroup
 DECLARE_String(doris_cgroup_cpu_path);
-DECLARE_Bool(enable_cpu_hard_limit);
 // This config controls whether the s3 file writer would flush cache 
asynchronously
 DECLARE_Bool(enable_flush_file_cache_async);
 
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 7e907b318d2..111259062cf 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -215,16 +215,6 @@ void PipelineTask::set_task_queue(TaskQueue* task_queue) {
     _task_queue = task_queue;
 }
 
-void PipelineTask::yield() {
-    int64_t time_spent = 0;
-    Defer defer {[&]() {
-        time_spent = time_spent * _core_num / _total_query_thread_num;
-        _task_queue->update_statistics(this, time_spent);
-    }};
-    SCOPED_RAW_TIMER(&time_spent);
-    usleep(THREAD_TIME_SLICE_US);
-}
-
 Status PipelineTask::execute(bool* eos) {
     SCOPED_TIMER(_task_profile->total_time_counter());
     SCOPED_CPU_TIMER(_task_cpu_timer);
@@ -232,12 +222,8 @@ Status PipelineTask::execute(bool* eos) {
     SCOPED_ATTACH_TASK(_state);
     int64_t time_spent = 0;
 
-    // todo(wb) use a more lightweight timer
-    RuntimeProfile::Counter tmp_timer(TUnit::TIME_NS);
-
     Defer defer {[&]() {
         if (_task_queue) {
-            time_spent = tmp_timer.value();
             _task_queue->update_statistics(this, time_spent);
         }
     }};
@@ -245,7 +231,7 @@ Status PipelineTask::execute(bool* eos) {
     *eos = false;
     if (!_opened) {
         {
-            SCOPED_CPU_TIMER(&tmp_timer);
+            SCOPED_RAW_TIMER(&time_spent);
             auto st = _open();
             if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
                 set_state(PipelineTaskState::BLOCKED_FOR_RF);
@@ -280,12 +266,12 @@ Status PipelineTask::execute(bool* eos) {
             set_state(PipelineTaskState::BLOCKED_FOR_SINK);
             break;
         }
-        if (tmp_timer.value() > THREAD_TIME_SLICE) {
+        if (time_spent > THREAD_TIME_SLICE) {
             COUNTER_UPDATE(_yield_counts, 1);
             break;
         }
         // TODO llj: Pipeline entity should_yield
-        SCOPED_CPU_TIMER(&tmp_timer);
+        SCOPED_RAW_TIMER(&time_spent);
         _block->clear_column_data(_root->row_desc().num_materialized_slots());
         auto* block = _block.get();
 
@@ -477,9 +463,6 @@ std::string PipelineTask::debug_string() {
 }
 
 taskgroup::TaskGroupPipelineTaskEntity* PipelineTask::get_task_group_entity() 
const {
-    if (_is_empty_task) {
-        return _empty_group_entity;
-    }
     return _fragment_context->get_task_group_entity();
 }
 
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 99e41421bb3..ef923868a51 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -117,8 +117,6 @@ public:
                  PipelineFragmentContext* fragment_context, RuntimeProfile* 
parent_profile);
     virtual ~PipelineTask() = default;
 
-    PipelineTask() = default;
-
     virtual Status prepare(RuntimeState* state);
 
     virtual Status execute(bool* eos);
@@ -198,7 +196,6 @@ public:
     TaskQueue* get_task_queue() { return _task_queue; }
 
     static constexpr auto THREAD_TIME_SLICE = 100'000'000ULL;
-    static constexpr auto THREAD_TIME_SLICE_US = 100000L; // 100ms
 
     // 1 used for update priority queue
     // note(wb) an ugly implementation, need refactor later
@@ -252,17 +249,6 @@ public:
 
     TUniqueId instance_id() const { return _state->fragment_instance_id(); }
 
-    void set_empty_task(bool is_empty_task) { _is_empty_task = is_empty_task; }
-
-    bool is_empty_task() const { return _is_empty_task; }
-
-    void yield();
-
-    void set_task_group_entity(
-            taskgroup::TaskGroupEntity<std::queue<pipeline::PipelineTask*>>* 
empty_group_entity) {
-        _empty_group_entity = empty_group_entity;
-    }
-
 protected:
     void _finish_p_dependency() {
         for (const auto& p : _pipeline->_parents) {
@@ -302,12 +288,6 @@ protected:
 
     bool _try_close_flag = false;
 
-    bool _is_empty_task = false;
-    taskgroup::TaskGroupEntity<std::queue<pipeline::PipelineTask*>>* 
_empty_group_entity;
-    int _core_num = CpuInfo::num_cores();
-    int _total_query_thread_num =
-            config::doris_scanner_thread_pool_thread_num + 
config::pipeline_executor_size;
-
     RuntimeProfile* _parent_profile;
     std::unique_ptr<RuntimeProfile> _task_profile;
     RuntimeProfile::Counter* _task_cpu_timer;
diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp
index 366e299f7b5..a68a2ba4a7f 100644
--- a/be/src/pipeline/task_queue.cpp
+++ b/be/src/pipeline/task_queue.cpp
@@ -222,17 +222,9 @@ bool 
TaskGroupTaskQueue::TaskGroupSchedEntityComparator::operator()(
 }
 
 TaskGroupTaskQueue::TaskGroupTaskQueue(size_t core_size)
-        : TaskQueue(core_size), _min_tg_entity(nullptr) {
-    _empty_pip_task->set_empty_task(true);
-    _empty_pip_task->set_task_queue(this);
-    _empty_pip_task->set_task_group_entity(_empty_group_entity);
-    _empty_group_entity->set_empty_group_entity(true);
-}
+        : TaskQueue(core_size), _min_tg_entity(nullptr) {}
 
-TaskGroupTaskQueue::~TaskGroupTaskQueue() {
-    delete _empty_group_entity;
-    delete _empty_pip_task;
-}
+TaskGroupTaskQueue::~TaskGroupTaskQueue() = default;
 
 void TaskGroupTaskQueue::close() {
     std::unique_lock<std::mutex> lock(_rs_mutex);
@@ -256,9 +248,6 @@ Status TaskGroupTaskQueue::_push_back(PipelineTask* task) {
     entity->task_queue()->emplace(task);
     if (_group_entities.find(entity) == _group_entities.end()) {
         _enqueue_task_group<from_executor>(entity);
-        if (_enable_cpu_hard_limit) {
-            reset_empty_group_entity();
-        }
     }
     _wait_task.notify_one();
     return Status::OK();
@@ -281,15 +270,9 @@ PipelineTask* TaskGroupTaskQueue::take(size_t core_id) {
             }
         }
     }
-    if (entity->is_empty_group_entity()) {
-        return _empty_pip_task;
-    }
     DCHECK(entity->task_size() > 0);
     if (entity->task_size() == 1) {
         _dequeue_task_group(entity);
-        if (_enable_cpu_hard_limit) {
-            reset_empty_group_entity();
-        }
     }
     auto task = entity->task_queue()->front();
     if (task) {
@@ -391,30 +374,5 @@ void TaskGroupTaskQueue::update_tg_cpu_share(const 
taskgroup::TaskGroupInfo& tas
     }
 }
 
-void TaskGroupTaskQueue::reset_empty_group_entity() {
-    int user_g_cpu_hard_limit = 0;
-    bool contains_empty_group = false;
-    for (auto* entity : _group_entities) {
-        if (!entity->is_empty_group_entity()) {
-            user_g_cpu_hard_limit += entity->cpu_share();
-        } else {
-            contains_empty_group = true;
-        }
-    }
-
-    // 0 <= user_g_cpu_hard_limit <= 100, bound by FE
-    // user_g_cpu_hard_limit = 0 means no group exists
-    int empty_group_cpu_share = 100 - user_g_cpu_hard_limit;
-    if (empty_group_cpu_share > 0 && empty_group_cpu_share < 100 && 
!contains_empty_group) {
-        _empty_group_entity->update_empty_cpu_share(empty_group_cpu_share);
-        _enqueue_task_group<true>(_empty_group_entity);
-    } else if ((empty_group_cpu_share == 0 || empty_group_cpu_share == 100) &&
-               contains_empty_group) {
-        // no need to update empty group here
-        // only update empty group's cpu share when exec enqueue
-        _dequeue_task_group(_empty_group_entity);
-    }
-}
-
 } // namespace pipeline
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h
index e6ed54417fd..d693cbe2168 100644
--- a/be/src/pipeline/task_queue.h
+++ b/be/src/pipeline/task_queue.h
@@ -187,8 +187,6 @@ public:
     void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info,
                              taskgroup::TGPTEntityPtr entity) override;
 
-    void reset_empty_group_entity();
-
 private:
     template <bool from_executor>
     Status _push_back(PipelineTask* task);
@@ -211,13 +209,6 @@ private:
     int _total_cpu_share = 0;
     std::atomic<taskgroup::TGPTEntityPtr> _min_tg_entity = nullptr;
     uint64_t _min_tg_v_runtime_ns = 0;
-
-    // empty group
-    taskgroup::TaskGroupEntity<std::queue<pipeline::PipelineTask*>>* 
_empty_group_entity =
-            new 
taskgroup::TaskGroupEntity<std::queue<pipeline::PipelineTask*>>();
-    PipelineTask* _empty_pip_task = new PipelineTask();
-    // todo(wb) support auto-switch cpu mode between soft limit and hard limit
-    bool _enable_cpu_hard_limit = config::enable_cpu_hard_limit;
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index cdd934d5c7d..d1978782554 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -222,10 +222,6 @@ void TaskScheduler::_do_work(size_t index) {
         if (!task) {
             continue;
         }
-        if (task->is_empty_task()) {
-            task->yield();
-            continue;
-        }
         task->set_task_queue(_task_queue.get());
         auto* fragment_ctx = task->fragment_context();
         signal::query_id_hi = fragment_ctx->get_query_id().hi;
diff --git a/be/src/runtime/task_group/task_group.cpp 
b/be/src/runtime/task_group/task_group.cpp
index 7abc08d3c27..37e4b9ae597 100644
--- a/be/src/runtime/task_group/task_group.cpp
+++ b/be/src/runtime/task_group/task_group.cpp
@@ -84,7 +84,7 @@ uint64_t TaskGroupEntity<QueueType>::cpu_share() const {
 
 template <typename QueueType>
 uint64_t TaskGroupEntity<QueueType>::task_group_id() const {
-    return _is_empty_group_entity ? -1 : _tg->id();
+    return _tg->id();
 }
 
 template <typename QueueType>
@@ -101,21 +101,6 @@ std::string TaskGroupEntity<QueueType>::debug_string() 
const {
                        _tg->id(), _tg->name(), _type, cpu_share(), 
task_size(), _vruntime_ns);
 }
 
-template <typename QueueType>
-void TaskGroupEntity<QueueType>::set_empty_group_entity(bool 
is_empty_group_entity) {
-    _is_empty_group_entity = is_empty_group_entity;
-}
-
-template <typename QueueType>
-bool TaskGroupEntity<QueueType>::is_empty_group_entity() {
-    return _is_empty_group_entity;
-}
-
-template <typename QueueType>
-void TaskGroupEntity<QueueType>::update_empty_cpu_share(uint64_t 
empty_cpu_share) {
-    _cpu_share = empty_cpu_share;
-}
-
 template class TaskGroupEntity<std::queue<pipeline::PipelineTask*>>;
 template class TaskGroupEntity<ScanTaskQueue>;
 
diff --git a/be/src/runtime/task_group/task_group.h 
b/be/src/runtime/task_group/task_group.h
index 1dc634469ee..a948bf53ec0 100644
--- a/be/src/runtime/task_group/task_group.h
+++ b/be/src/runtime/task_group/task_group.h
@@ -50,8 +50,6 @@ public:
     explicit TaskGroupEntity(taskgroup::TaskGroup* tg, std::string type);
     ~TaskGroupEntity();
 
-    TaskGroupEntity() = default; // used for empty group entity
-
     uint64_t vruntime_ns() const { return _vruntime_ns; }
 
     QueueType* task_queue();
@@ -70,12 +68,6 @@ public:
 
     void check_and_update_cpu_share(const TaskGroupInfo& tg_info);
 
-    void set_empty_group_entity(bool is_empty_group_entity);
-
-    bool is_empty_group_entity();
-
-    void update_empty_cpu_share(uint64_t empty_cpu_share);
-
 private:
     QueueType* _task_queue;
 
@@ -89,8 +81,6 @@ private:
     // independent updates.
     int64_t _version;
     uint64_t _cpu_share;
-
-    bool _is_empty_group_entity = false;
 };
 
 // TODO llj tg use PriorityTaskQueue to replace std::queue
diff --git a/be/src/vec/exec/scan/scan_task_queue.cpp 
b/be/src/vec/exec/scan/scan_task_queue.cpp
index 90171bc87c4..89235b6b7a9 100644
--- a/be/src/vec/exec/scan/scan_task_queue.cpp
+++ b/be/src/vec/exec/scan/scan_task_queue.cpp
@@ -53,16 +53,8 @@ bool ScanTaskQueue::try_get(ScanTask* scan_task, uint32_t 
timeout_ms) {
     return r;
 }
 
-ScanTaskTaskGroupQueue::ScanTaskTaskGroupQueue(size_t core_size) : 
_core_size(core_size) {
-    _empty_scan_task->scan_entity = _empty_group_entity;
-    _empty_scan_task->is_empty_task = true;
-    _empty_group_entity->set_empty_group_entity(true);
-}
-
-ScanTaskTaskGroupQueue::~ScanTaskTaskGroupQueue() {
-    delete _empty_group_entity;
-    delete _empty_scan_task;
-}
+ScanTaskTaskGroupQueue::ScanTaskTaskGroupQueue(size_t core_size) : 
_core_size(core_size) {}
+ScanTaskTaskGroupQueue::~ScanTaskTaskGroupQueue() = default;
 
 void ScanTaskTaskGroupQueue::close() {
     std::unique_lock<std::mutex> lock(_rs_mutex);
@@ -86,16 +78,9 @@ bool ScanTaskTaskGroupQueue::take(ScanTask* scan_task) {
             }
         }
     }
-    if (entity->is_empty_group_entity()) {
-        *scan_task = *_empty_scan_task;
-        return true;
-    }
     DCHECK(entity->task_size() > 0);
     if (entity->task_size() == 1) {
         _dequeue_task_group(entity);
-        if (_enable_cpu_hard_limit) {
-            reset_empty_group_entity();
-        }
     }
     return entity->task_queue()->try_get(scan_task, WAIT_CORE_TASK_TIMEOUT_MS 
/* timeout_ms */);
 }
@@ -110,9 +95,6 @@ bool ScanTaskTaskGroupQueue::push_back(ScanTask scan_task) {
     }
     if (_group_entities.find(entity) == _group_entities.end()) {
         _enqueue_task_group(entity);
-        if (_enable_cpu_hard_limit) {
-            reset_empty_group_entity();
-        }
     }
     _wait_task.notify_one();
     return true;
@@ -150,31 +132,6 @@ void ScanTaskTaskGroupQueue::update_tg_cpu_share(const 
taskgroup::TaskGroupInfo&
     }
 }
 
-void ScanTaskTaskGroupQueue::reset_empty_group_entity() {
-    int user_g_cpu_hard_limit = 0;
-    bool contains_empty_group = false;
-    for (auto* entity : _group_entities) {
-        if (!entity->is_empty_group_entity()) {
-            user_g_cpu_hard_limit += entity->cpu_share();
-        } else {
-            contains_empty_group = true;
-        }
-    }
-
-    // 0 <= user_g_cpu_hard_limit <= 100, bound by FE
-    // user_g_cpu_hard_limit = 0 means no group exists
-    int empty_group_cpu_share = 100 - user_g_cpu_hard_limit;
-    if (empty_group_cpu_share > 0 && empty_group_cpu_share < 100 && 
!contains_empty_group) {
-        _empty_group_entity->update_empty_cpu_share(empty_group_cpu_share);
-        _enqueue_task_group(_empty_group_entity);
-    } else if ((empty_group_cpu_share == 0 || empty_group_cpu_share == 100) &&
-               contains_empty_group) {
-        // no need to update empty group here
-        // only update empty group's cpu share when exec enqueue
-        _dequeue_task_group(_empty_group_entity);
-    }
-}
-
 void ScanTaskTaskGroupQueue::_enqueue_task_group(TGSTEntityPtr tg_entity) {
     _total_cpu_share += tg_entity->cpu_share();
     // TODO llj tg If submitted back to this queue from the scanner thread, 
`adjust_vruntime_ns`
diff --git a/be/src/vec/exec/scan/scan_task_queue.h 
b/be/src/vec/exec/scan/scan_task_queue.h
index 4afd685c79e..c694859e3cf 100644
--- a/be/src/vec/exec/scan/scan_task_queue.h
+++ b/be/src/vec/exec/scan/scan_task_queue.h
@@ -29,7 +29,6 @@ namespace taskgroup {
 
 using WorkFunction = std::function<void()>;
 static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100;
-static constexpr auto SCAN_THREAD_TIME_SLICE_US = 100000L; // 100ms
 
 // Like PriorityThreadPool::Task
 struct ScanTask {
@@ -46,7 +45,6 @@ struct ScanTask {
     vectorized::ScannerContext* scanner_context;
     TGSTEntityPtr scan_entity;
     int priority;
-    bool is_empty_task = false;
 };
 
 // Like pipeline::PriorityTaskQueue use BlockingPriorityQueue directly?
@@ -75,8 +73,6 @@ public:
 
     void update_tg_cpu_share(const taskgroup::TaskGroupInfo&, 
taskgroup::TGSTEntityPtr);
 
-    void reset_empty_group_entity();
-
 private:
     TGSTEntityPtr _task_entity(ScanTask& scan_task);
     void _enqueue_task_group(TGSTEntityPtr);
@@ -98,11 +94,6 @@ private:
     std::atomic<taskgroup::TGSTEntityPtr> _min_tg_entity = nullptr;
     uint64_t _min_tg_v_runtime_ns = 0;
     size_t _core_size;
-
-    TaskGroupEntity<ScanTaskQueue>* _empty_group_entity = new 
TaskGroupEntity<ScanTaskQueue>();
-    taskgroup::ScanTask* _empty_scan_task = new taskgroup::ScanTask();
-    // todo(wb) support auto-switch cpu mode between soft limit and hard limit
-    bool _enable_cpu_hard_limit = config::enable_cpu_hard_limit;
 };
 
 } // namespace taskgroup
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 7c0064a1f4c..8ebb6405bd8 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -379,8 +379,8 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler, ScannerContext
     bool should_stop = false;
     // Has to wait at least one full block, or it will cause a lot of schedule 
task in priority
     // queue, it will affect query latency and query concurrency for example 
ssb 3.3.
-    while (!eos && raw_bytes_read < raw_bytes_threshold &&
-           (raw_rows_read < raw_rows_threshold || num_rows_in_block < 
state->batch_size())) {
+    while (!eos && raw_bytes_read < raw_bytes_threshold && raw_rows_read < 
raw_rows_threshold &&
+           num_rows_in_block < state->batch_size()) {
         // TODO llj task group should should_yield?
         if (UNLIKELY(ctx->done())) {
             // No need to set status on error here.
@@ -455,19 +455,9 @@ void 
ScannerScheduler::_task_group_scanner_scan(ScannerScheduler* scheduler,
         auto success = scan_queue->take(&scan_task);
         if (success) {
             int64_t time_spent = 0;
-            if (!scan_task.is_empty_task) {
-                RuntimeProfile::Counter tmp_timer(TUnit::TIME_NS);
-                {
-                    SCOPED_CPU_TIMER(&tmp_timer);
-                    scan_task.scan_func();
-                }
-                time_spent = tmp_timer.value();
-            } else {
-                {
-                    SCOPED_RAW_TIMER(&time_spent);
-                    usleep(taskgroup::SCAN_THREAD_TIME_SLICE_US);
-                }
-                time_spent = time_spent * _core_num / _total_query_thread_num;
+            {
+                SCOPED_RAW_TIMER(&time_spent);
+                scan_task.scan_func();
             }
             scan_queue->update_statistics(scan_task, time_spent);
         }
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index 32048458ed9..25f79e89aa2 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -115,10 +115,6 @@ private:
     // true is the scheduler is closed.
     std::atomic_bool _is_closed = {false};
     bool _is_init = false;
-
-    int _core_num = CpuInfo::num_cores();
-    int _total_query_thread_num =
-            config::doris_scanner_thread_pool_thread_num + 
config::pipeline_executor_size;
 };
 
 struct SimplifiedScanTask {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to