This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 46d40b19525 [refactor](executor)Remove empty group logic #26005
46d40b19525 is described below
commit 46d40b195258d8abc34d56dc177d209d9873b07f
Author: wangbo <[email protected]>
AuthorDate: Fri Oct 27 14:24:41 2023 +0800
[refactor](executor)Remove empty group logic #26005
---
be/src/common/config.cpp | 1 -
be/src/common/config.h | 1 -
be/src/pipeline/pipeline_task.cpp | 23 ++-------------
be/src/pipeline/pipeline_task.h | 20 -------------
be/src/pipeline/task_queue.cpp | 46 ++---------------------------
be/src/pipeline/task_queue.h | 9 ------
be/src/pipeline/task_scheduler.cpp | 4 ---
be/src/runtime/task_group/task_group.cpp | 17 +----------
be/src/runtime/task_group/task_group.h | 10 -------
be/src/vec/exec/scan/scan_task_queue.cpp | 47 ++----------------------------
be/src/vec/exec/scan/scan_task_queue.h | 9 ------
be/src/vec/exec/scan/scanner_scheduler.cpp | 20 ++++---------
be/src/vec/exec/scan/scanner_scheduler.h | 4 ---
13 files changed, 13 insertions(+), 198 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 58121768723..5b6e393feed 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1110,7 +1110,6 @@ DEFINE_Bool(enable_flush_file_cache_async, "true");
// cgroup
DEFINE_String(doris_cgroup_cpu_path, "");
-DEFINE_Bool(enable_cpu_hard_limit, "false");
DEFINE_Bool(ignore_always_true_predicate_for_segment, "true");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index f8350d87316..71c0a1e12c4 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1177,7 +1177,6 @@ DECLARE_mBool(exit_on_exception);
// cgroup
DECLARE_String(doris_cgroup_cpu_path);
-DECLARE_Bool(enable_cpu_hard_limit);
// This config controls whether the s3 file writer would flush cache
asynchronously
DECLARE_Bool(enable_flush_file_cache_async);
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 7e907b318d2..111259062cf 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -215,16 +215,6 @@ void PipelineTask::set_task_queue(TaskQueue* task_queue) {
_task_queue = task_queue;
}
-void PipelineTask::yield() {
- int64_t time_spent = 0;
- Defer defer {[&]() {
- time_spent = time_spent * _core_num / _total_query_thread_num;
- _task_queue->update_statistics(this, time_spent);
- }};
- SCOPED_RAW_TIMER(&time_spent);
- usleep(THREAD_TIME_SLICE_US);
-}
-
Status PipelineTask::execute(bool* eos) {
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_CPU_TIMER(_task_cpu_timer);
@@ -232,12 +222,8 @@ Status PipelineTask::execute(bool* eos) {
SCOPED_ATTACH_TASK(_state);
int64_t time_spent = 0;
- // todo(wb) use a more lightweight timer
- RuntimeProfile::Counter tmp_timer(TUnit::TIME_NS);
-
Defer defer {[&]() {
if (_task_queue) {
- time_spent = tmp_timer.value();
_task_queue->update_statistics(this, time_spent);
}
}};
@@ -245,7 +231,7 @@ Status PipelineTask::execute(bool* eos) {
*eos = false;
if (!_opened) {
{
- SCOPED_CPU_TIMER(&tmp_timer);
+ SCOPED_RAW_TIMER(&time_spent);
auto st = _open();
if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
set_state(PipelineTaskState::BLOCKED_FOR_RF);
@@ -280,12 +266,12 @@ Status PipelineTask::execute(bool* eos) {
set_state(PipelineTaskState::BLOCKED_FOR_SINK);
break;
}
- if (tmp_timer.value() > THREAD_TIME_SLICE) {
+ if (time_spent > THREAD_TIME_SLICE) {
COUNTER_UPDATE(_yield_counts, 1);
break;
}
// TODO llj: Pipeline entity should_yield
- SCOPED_CPU_TIMER(&tmp_timer);
+ SCOPED_RAW_TIMER(&time_spent);
_block->clear_column_data(_root->row_desc().num_materialized_slots());
auto* block = _block.get();
@@ -477,9 +463,6 @@ std::string PipelineTask::debug_string() {
}
taskgroup::TaskGroupPipelineTaskEntity* PipelineTask::get_task_group_entity()
const {
- if (_is_empty_task) {
- return _empty_group_entity;
- }
return _fragment_context->get_task_group_entity();
}
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 99e41421bb3..ef923868a51 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -117,8 +117,6 @@ public:
PipelineFragmentContext* fragment_context, RuntimeProfile*
parent_profile);
virtual ~PipelineTask() = default;
- PipelineTask() = default;
-
virtual Status prepare(RuntimeState* state);
virtual Status execute(bool* eos);
@@ -198,7 +196,6 @@ public:
TaskQueue* get_task_queue() { return _task_queue; }
static constexpr auto THREAD_TIME_SLICE = 100'000'000ULL;
- static constexpr auto THREAD_TIME_SLICE_US = 100000L; // 100ms
// 1 used for update priority queue
// note(wb) an ugly implementation, need refactor later
@@ -252,17 +249,6 @@ public:
TUniqueId instance_id() const { return _state->fragment_instance_id(); }
- void set_empty_task(bool is_empty_task) { _is_empty_task = is_empty_task; }
-
- bool is_empty_task() const { return _is_empty_task; }
-
- void yield();
-
- void set_task_group_entity(
- taskgroup::TaskGroupEntity<std::queue<pipeline::PipelineTask*>>*
empty_group_entity) {
- _empty_group_entity = empty_group_entity;
- }
-
protected:
void _finish_p_dependency() {
for (const auto& p : _pipeline->_parents) {
@@ -302,12 +288,6 @@ protected:
bool _try_close_flag = false;
- bool _is_empty_task = false;
- taskgroup::TaskGroupEntity<std::queue<pipeline::PipelineTask*>>*
_empty_group_entity;
- int _core_num = CpuInfo::num_cores();
- int _total_query_thread_num =
- config::doris_scanner_thread_pool_thread_num +
config::pipeline_executor_size;
-
RuntimeProfile* _parent_profile;
std::unique_ptr<RuntimeProfile> _task_profile;
RuntimeProfile::Counter* _task_cpu_timer;
diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp
index 366e299f7b5..a68a2ba4a7f 100644
--- a/be/src/pipeline/task_queue.cpp
+++ b/be/src/pipeline/task_queue.cpp
@@ -222,17 +222,9 @@ bool
TaskGroupTaskQueue::TaskGroupSchedEntityComparator::operator()(
}
TaskGroupTaskQueue::TaskGroupTaskQueue(size_t core_size)
- : TaskQueue(core_size), _min_tg_entity(nullptr) {
- _empty_pip_task->set_empty_task(true);
- _empty_pip_task->set_task_queue(this);
- _empty_pip_task->set_task_group_entity(_empty_group_entity);
- _empty_group_entity->set_empty_group_entity(true);
-}
+ : TaskQueue(core_size), _min_tg_entity(nullptr) {}
-TaskGroupTaskQueue::~TaskGroupTaskQueue() {
- delete _empty_group_entity;
- delete _empty_pip_task;
-}
+TaskGroupTaskQueue::~TaskGroupTaskQueue() = default;
void TaskGroupTaskQueue::close() {
std::unique_lock<std::mutex> lock(_rs_mutex);
@@ -256,9 +248,6 @@ Status TaskGroupTaskQueue::_push_back(PipelineTask* task) {
entity->task_queue()->emplace(task);
if (_group_entities.find(entity) == _group_entities.end()) {
_enqueue_task_group<from_executor>(entity);
- if (_enable_cpu_hard_limit) {
- reset_empty_group_entity();
- }
}
_wait_task.notify_one();
return Status::OK();
@@ -281,15 +270,9 @@ PipelineTask* TaskGroupTaskQueue::take(size_t core_id) {
}
}
}
- if (entity->is_empty_group_entity()) {
- return _empty_pip_task;
- }
DCHECK(entity->task_size() > 0);
if (entity->task_size() == 1) {
_dequeue_task_group(entity);
- if (_enable_cpu_hard_limit) {
- reset_empty_group_entity();
- }
}
auto task = entity->task_queue()->front();
if (task) {
@@ -391,30 +374,5 @@ void TaskGroupTaskQueue::update_tg_cpu_share(const
taskgroup::TaskGroupInfo& tas
}
}
-void TaskGroupTaskQueue::reset_empty_group_entity() {
- int user_g_cpu_hard_limit = 0;
- bool contains_empty_group = false;
- for (auto* entity : _group_entities) {
- if (!entity->is_empty_group_entity()) {
- user_g_cpu_hard_limit += entity->cpu_share();
- } else {
- contains_empty_group = true;
- }
- }
-
- // 0 <= user_g_cpu_hard_limit <= 100, bound by FE
- // user_g_cpu_hard_limit = 0 means no group exists
- int empty_group_cpu_share = 100 - user_g_cpu_hard_limit;
- if (empty_group_cpu_share > 0 && empty_group_cpu_share < 100 &&
!contains_empty_group) {
- _empty_group_entity->update_empty_cpu_share(empty_group_cpu_share);
- _enqueue_task_group<true>(_empty_group_entity);
- } else if ((empty_group_cpu_share == 0 || empty_group_cpu_share == 100) &&
- contains_empty_group) {
- // no need to update empty group here
- // only update empty group's cpu share when exec enqueue
- _dequeue_task_group(_empty_group_entity);
- }
-}
-
} // namespace pipeline
} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h
index e6ed54417fd..d693cbe2168 100644
--- a/be/src/pipeline/task_queue.h
+++ b/be/src/pipeline/task_queue.h
@@ -187,8 +187,6 @@ public:
void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info,
taskgroup::TGPTEntityPtr entity) override;
- void reset_empty_group_entity();
-
private:
template <bool from_executor>
Status _push_back(PipelineTask* task);
@@ -211,13 +209,6 @@ private:
int _total_cpu_share = 0;
std::atomic<taskgroup::TGPTEntityPtr> _min_tg_entity = nullptr;
uint64_t _min_tg_v_runtime_ns = 0;
-
- // empty group
- taskgroup::TaskGroupEntity<std::queue<pipeline::PipelineTask*>>*
_empty_group_entity =
- new
taskgroup::TaskGroupEntity<std::queue<pipeline::PipelineTask*>>();
- PipelineTask* _empty_pip_task = new PipelineTask();
- // todo(wb) support auto-switch cpu mode between soft limit and hard limit
- bool _enable_cpu_hard_limit = config::enable_cpu_hard_limit;
};
} // namespace pipeline
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index cdd934d5c7d..d1978782554 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -222,10 +222,6 @@ void TaskScheduler::_do_work(size_t index) {
if (!task) {
continue;
}
- if (task->is_empty_task()) {
- task->yield();
- continue;
- }
task->set_task_queue(_task_queue.get());
auto* fragment_ctx = task->fragment_context();
signal::query_id_hi = fragment_ctx->get_query_id().hi;
diff --git a/be/src/runtime/task_group/task_group.cpp
b/be/src/runtime/task_group/task_group.cpp
index 7abc08d3c27..37e4b9ae597 100644
--- a/be/src/runtime/task_group/task_group.cpp
+++ b/be/src/runtime/task_group/task_group.cpp
@@ -84,7 +84,7 @@ uint64_t TaskGroupEntity<QueueType>::cpu_share() const {
template <typename QueueType>
uint64_t TaskGroupEntity<QueueType>::task_group_id() const {
- return _is_empty_group_entity ? -1 : _tg->id();
+ return _tg->id();
}
template <typename QueueType>
@@ -101,21 +101,6 @@ std::string TaskGroupEntity<QueueType>::debug_string()
const {
_tg->id(), _tg->name(), _type, cpu_share(),
task_size(), _vruntime_ns);
}
-template <typename QueueType>
-void TaskGroupEntity<QueueType>::set_empty_group_entity(bool
is_empty_group_entity) {
- _is_empty_group_entity = is_empty_group_entity;
-}
-
-template <typename QueueType>
-bool TaskGroupEntity<QueueType>::is_empty_group_entity() {
- return _is_empty_group_entity;
-}
-
-template <typename QueueType>
-void TaskGroupEntity<QueueType>::update_empty_cpu_share(uint64_t
empty_cpu_share) {
- _cpu_share = empty_cpu_share;
-}
-
template class TaskGroupEntity<std::queue<pipeline::PipelineTask*>>;
template class TaskGroupEntity<ScanTaskQueue>;
diff --git a/be/src/runtime/task_group/task_group.h
b/be/src/runtime/task_group/task_group.h
index 1dc634469ee..a948bf53ec0 100644
--- a/be/src/runtime/task_group/task_group.h
+++ b/be/src/runtime/task_group/task_group.h
@@ -50,8 +50,6 @@ public:
explicit TaskGroupEntity(taskgroup::TaskGroup* tg, std::string type);
~TaskGroupEntity();
- TaskGroupEntity() = default; // used for empty group entity
-
uint64_t vruntime_ns() const { return _vruntime_ns; }
QueueType* task_queue();
@@ -70,12 +68,6 @@ public:
void check_and_update_cpu_share(const TaskGroupInfo& tg_info);
- void set_empty_group_entity(bool is_empty_group_entity);
-
- bool is_empty_group_entity();
-
- void update_empty_cpu_share(uint64_t empty_cpu_share);
-
private:
QueueType* _task_queue;
@@ -89,8 +81,6 @@ private:
// independent updates.
int64_t _version;
uint64_t _cpu_share;
-
- bool _is_empty_group_entity = false;
};
// TODO llj tg use PriorityTaskQueue to replace std::queue
diff --git a/be/src/vec/exec/scan/scan_task_queue.cpp
b/be/src/vec/exec/scan/scan_task_queue.cpp
index 90171bc87c4..89235b6b7a9 100644
--- a/be/src/vec/exec/scan/scan_task_queue.cpp
+++ b/be/src/vec/exec/scan/scan_task_queue.cpp
@@ -53,16 +53,8 @@ bool ScanTaskQueue::try_get(ScanTask* scan_task, uint32_t
timeout_ms) {
return r;
}
-ScanTaskTaskGroupQueue::ScanTaskTaskGroupQueue(size_t core_size) :
_core_size(core_size) {
- _empty_scan_task->scan_entity = _empty_group_entity;
- _empty_scan_task->is_empty_task = true;
- _empty_group_entity->set_empty_group_entity(true);
-}
-
-ScanTaskTaskGroupQueue::~ScanTaskTaskGroupQueue() {
- delete _empty_group_entity;
- delete _empty_scan_task;
-}
+ScanTaskTaskGroupQueue::ScanTaskTaskGroupQueue(size_t core_size) :
_core_size(core_size) {}
+ScanTaskTaskGroupQueue::~ScanTaskTaskGroupQueue() = default;
void ScanTaskTaskGroupQueue::close() {
std::unique_lock<std::mutex> lock(_rs_mutex);
@@ -86,16 +78,9 @@ bool ScanTaskTaskGroupQueue::take(ScanTask* scan_task) {
}
}
}
- if (entity->is_empty_group_entity()) {
- *scan_task = *_empty_scan_task;
- return true;
- }
DCHECK(entity->task_size() > 0);
if (entity->task_size() == 1) {
_dequeue_task_group(entity);
- if (_enable_cpu_hard_limit) {
- reset_empty_group_entity();
- }
}
return entity->task_queue()->try_get(scan_task, WAIT_CORE_TASK_TIMEOUT_MS
/* timeout_ms */);
}
@@ -110,9 +95,6 @@ bool ScanTaskTaskGroupQueue::push_back(ScanTask scan_task) {
}
if (_group_entities.find(entity) == _group_entities.end()) {
_enqueue_task_group(entity);
- if (_enable_cpu_hard_limit) {
- reset_empty_group_entity();
- }
}
_wait_task.notify_one();
return true;
@@ -150,31 +132,6 @@ void ScanTaskTaskGroupQueue::update_tg_cpu_share(const
taskgroup::TaskGroupInfo&
}
}
-void ScanTaskTaskGroupQueue::reset_empty_group_entity() {
- int user_g_cpu_hard_limit = 0;
- bool contains_empty_group = false;
- for (auto* entity : _group_entities) {
- if (!entity->is_empty_group_entity()) {
- user_g_cpu_hard_limit += entity->cpu_share();
- } else {
- contains_empty_group = true;
- }
- }
-
- // 0 <= user_g_cpu_hard_limit <= 100, bound by FE
- // user_g_cpu_hard_limit = 0 means no group exists
- int empty_group_cpu_share = 100 - user_g_cpu_hard_limit;
- if (empty_group_cpu_share > 0 && empty_group_cpu_share < 100 &&
!contains_empty_group) {
- _empty_group_entity->update_empty_cpu_share(empty_group_cpu_share);
- _enqueue_task_group(_empty_group_entity);
- } else if ((empty_group_cpu_share == 0 || empty_group_cpu_share == 100) &&
- contains_empty_group) {
- // no need to update empty group here
- // only update empty group's cpu share when exec enqueue
- _dequeue_task_group(_empty_group_entity);
- }
-}
-
void ScanTaskTaskGroupQueue::_enqueue_task_group(TGSTEntityPtr tg_entity) {
_total_cpu_share += tg_entity->cpu_share();
// TODO llj tg If submitted back to this queue from the scanner thread,
`adjust_vruntime_ns`
diff --git a/be/src/vec/exec/scan/scan_task_queue.h
b/be/src/vec/exec/scan/scan_task_queue.h
index 4afd685c79e..c694859e3cf 100644
--- a/be/src/vec/exec/scan/scan_task_queue.h
+++ b/be/src/vec/exec/scan/scan_task_queue.h
@@ -29,7 +29,6 @@ namespace taskgroup {
using WorkFunction = std::function<void()>;
static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100;
-static constexpr auto SCAN_THREAD_TIME_SLICE_US = 100000L; // 100ms
// Like PriorityThreadPool::Task
struct ScanTask {
@@ -46,7 +45,6 @@ struct ScanTask {
vectorized::ScannerContext* scanner_context;
TGSTEntityPtr scan_entity;
int priority;
- bool is_empty_task = false;
};
// Like pipeline::PriorityTaskQueue use BlockingPriorityQueue directly?
@@ -75,8 +73,6 @@ public:
void update_tg_cpu_share(const taskgroup::TaskGroupInfo&,
taskgroup::TGSTEntityPtr);
- void reset_empty_group_entity();
-
private:
TGSTEntityPtr _task_entity(ScanTask& scan_task);
void _enqueue_task_group(TGSTEntityPtr);
@@ -98,11 +94,6 @@ private:
std::atomic<taskgroup::TGSTEntityPtr> _min_tg_entity = nullptr;
uint64_t _min_tg_v_runtime_ns = 0;
size_t _core_size;
-
- TaskGroupEntity<ScanTaskQueue>* _empty_group_entity = new
TaskGroupEntity<ScanTaskQueue>();
- taskgroup::ScanTask* _empty_scan_task = new taskgroup::ScanTask();
- // todo(wb) support auto-switch cpu mode between soft limit and hard limit
- bool _enable_cpu_hard_limit = config::enable_cpu_hard_limit;
};
} // namespace taskgroup
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 7c0064a1f4c..8ebb6405bd8 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -379,8 +379,8 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler*
scheduler, ScannerContext
bool should_stop = false;
// Has to wait at least one full block, or it will cause a lot of schedule
task in priority
// queue, it will affect query latency and query concurrency for example
ssb 3.3.
- while (!eos && raw_bytes_read < raw_bytes_threshold &&
- (raw_rows_read < raw_rows_threshold || num_rows_in_block <
state->batch_size())) {
+ while (!eos && raw_bytes_read < raw_bytes_threshold && raw_rows_read <
raw_rows_threshold &&
+ num_rows_in_block < state->batch_size()) {
// TODO llj task group should should_yield?
if (UNLIKELY(ctx->done())) {
// No need to set status on error here.
@@ -455,19 +455,9 @@ void
ScannerScheduler::_task_group_scanner_scan(ScannerScheduler* scheduler,
auto success = scan_queue->take(&scan_task);
if (success) {
int64_t time_spent = 0;
- if (!scan_task.is_empty_task) {
- RuntimeProfile::Counter tmp_timer(TUnit::TIME_NS);
- {
- SCOPED_CPU_TIMER(&tmp_timer);
- scan_task.scan_func();
- }
- time_spent = tmp_timer.value();
- } else {
- {
- SCOPED_RAW_TIMER(&time_spent);
- usleep(taskgroup::SCAN_THREAD_TIME_SLICE_US);
- }
- time_spent = time_spent * _core_num / _total_query_thread_num;
+ {
+ SCOPED_RAW_TIMER(&time_spent);
+ scan_task.scan_func();
}
scan_queue->update_statistics(scan_task, time_spent);
}
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h
b/be/src/vec/exec/scan/scanner_scheduler.h
index 32048458ed9..25f79e89aa2 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -115,10 +115,6 @@ private:
// true is the scheduler is closed.
std::atomic_bool _is_closed = {false};
bool _is_init = false;
-
- int _core_num = CpuInfo::num_cores();
- int _total_query_thread_num =
- config::doris_scanner_thread_pool_thread_num +
config::pipeline_executor_size;
};
struct SimplifiedScanTask {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]