This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new aca8406e319 [refactor](executor)remove scan group #28847
aca8406e319 is described below
commit aca8406e319530030e065a34f19a496fab86e85b
Author: wangbo <[email protected]>
AuthorDate: Fri Dec 22 17:05:50 2023 +0800
[refactor](executor)remove scan group #28847
---
be/src/common/config.cpp | 2 -
be/src/common/config.h | 2 -
be/src/runtime/task_group/task_group.cpp | 5 -
be/src/runtime/task_group/task_group.h | 6 -
be/src/vec/exec/scan/scan_task_queue.cpp | 221 -----------------------------
be/src/vec/exec/scan/scan_task_queue.h | 99 -------------
be/src/vec/exec/scan/scanner_context.cpp | 4 -
be/src/vec/exec/scan/scanner_context.h | 1 -
be/src/vec/exec/scan/scanner_scheduler.cpp | 44 ------
be/src/vec/exec/scan/scanner_scheduler.h | 13 +-
10 files changed, 1 insertion(+), 396 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 03eaee7b23c..ecc44a08e47 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -970,8 +970,6 @@ DEFINE_Bool(enable_fuzzy_mode, "false");
DEFINE_Bool(enable_debug_points, "false");
DEFINE_Int32(pipeline_executor_size, "0");
-DEFINE_Bool(enable_workload_group_for_scan, "false");
-DEFINE_mInt64(workload_group_scan_task_wait_timeout_ms, "10000");
// 128 MB
DEFINE_mInt64(local_exchange_buffer_mem_limit, "134217728");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index e011073d44d..a9508c6e8af 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1024,8 +1024,6 @@ DECLARE_Bool(enable_fuzzy_mode);
DECLARE_Bool(enable_debug_points);
DECLARE_Int32(pipeline_executor_size);
-DECLARE_Bool(enable_workload_group_for_scan);
-DECLARE_mInt64(workload_group_scan_task_wait_timeout_ms);
// Temp config. True to use optimization for bitmap_index apply predicate
except leaf node of the and node.
// Will remove after fully test.
diff --git a/be/src/runtime/task_group/task_group.cpp
b/be/src/runtime/task_group/task_group.cpp
index 137f5ea2345..9e86f8b831b 100644
--- a/be/src/runtime/task_group/task_group.cpp
+++ b/be/src/runtime/task_group/task_group.cpp
@@ -33,7 +33,6 @@
#include "runtime/memory/mem_tracker_limiter.h"
#include "util/mem_info.h"
#include "util/parse_util.h"
-#include "vec/exec/scan/scan_task_queue.h"
#include "vec/exec/scan/scanner_scheduler.h"
namespace doris {
@@ -102,7 +101,6 @@ std::string TaskGroupEntity<QueueType>::debug_string()
const {
}
template class TaskGroupEntity<std::queue<pipeline::PipelineTask*>>;
-template class TaskGroupEntity<ScanTaskQueue>;
TaskGroup::TaskGroup(const TaskGroupInfo& tg_info)
: _id(tg_info.id),
@@ -112,7 +110,6 @@ TaskGroup::TaskGroup(const TaskGroupInfo& tg_info)
_enable_memory_overcommit(tg_info.enable_memory_overcommit),
_cpu_share(tg_info.cpu_share),
_task_entity(this, "pipeline task entity"),
- _local_scan_entity(this, "local scan entity"),
_mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM),
_cpu_hard_limit(tg_info.cpu_hard_limit) {}
@@ -150,8 +147,6 @@ void TaskGroup::check_and_update(const TaskGroupInfo&
tg_info) {
}
ExecEnv::GetInstance()->pipeline_task_group_scheduler()->task_queue()->update_tg_cpu_share(
tg_info, &_task_entity);
-
ExecEnv::GetInstance()->scanner_scheduler()->local_scan_task_queue()->update_tg_cpu_share(
- tg_info, &_local_scan_entity);
}
int64_t TaskGroup::memory_used() {
diff --git a/be/src/runtime/task_group/task_group.h
b/be/src/runtime/task_group/task_group.h
index f1c8523664e..04dbf518f0d 100644
--- a/be/src/runtime/task_group/task_group.h
+++ b/be/src/runtime/task_group/task_group.h
@@ -43,7 +43,6 @@ namespace taskgroup {
class TaskGroup;
struct TaskGroupInfo;
-class ScanTaskQueue;
template <typename QueueType>
class TaskGroupEntity {
@@ -88,9 +87,6 @@ private:
using TaskGroupPipelineTaskEntity =
TaskGroupEntity<std::queue<pipeline::PipelineTask*>>;
using TGPTEntityPtr = TaskGroupPipelineTaskEntity*;
-using TaskGroupScanTaskEntity = TaskGroupEntity<ScanTaskQueue>;
-using TGSTEntityPtr = TaskGroupScanTaskEntity*;
-
struct TgTrackerLimiterGroup {
std::unordered_set<std::shared_ptr<MemTrackerLimiter>> trackers;
std::mutex group_lock;
@@ -101,7 +97,6 @@ public:
explicit TaskGroup(const TaskGroupInfo& tg_info);
TaskGroupPipelineTaskEntity* task_entity() { return &_task_entity; }
- TGSTEntityPtr local_scan_task_entity() { return &_local_scan_entity; }
int64_t version() const { return _version; }
@@ -155,7 +150,6 @@ private:
bool _enable_memory_overcommit;
std::atomic<uint64_t> _cpu_share;
TaskGroupPipelineTaskEntity _task_entity;
- TaskGroupScanTaskEntity _local_scan_entity;
std::vector<TgTrackerLimiterGroup> _mem_tracker_limiter_pool;
std::atomic<int> _cpu_hard_limit;
};
diff --git a/be/src/vec/exec/scan/scan_task_queue.cpp
b/be/src/vec/exec/scan/scan_task_queue.cpp
deleted file mode 100644
index 7c2068e5715..00000000000
--- a/be/src/vec/exec/scan/scan_task_queue.cpp
+++ /dev/null
@@ -1,221 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "scan_task_queue.h"
-
-#include "pipeline/pipeline_task.h"
-#include "runtime/task_group/task_group.h"
-#include "vec/exec/scan/scanner_context.h"
-
-namespace doris {
-namespace taskgroup {
-static void empty_function() {}
-ScanTask::ScanTask() : ScanTask(empty_function, nullptr, nullptr, 1) {}
-
-ScanTask::ScanTask(WorkFunction scan_func,
- std::shared_ptr<vectorized::ScannerContext> scanner_context,
- TGSTEntityPtr scan_entity, int priority)
- : scan_func(std::move(scan_func)),
- scanner_context(scanner_context),
- scan_entity(scan_entity),
- priority(priority) {}
-
-ScanTaskQueue::ScanTaskQueue() :
_queue(config::doris_scanner_thread_pool_queue_size) {}
-
-Status ScanTaskQueue::try_push_back(ScanTask scan_task) {
- if (_queue.try_put(std::move(scan_task))) {
- VLOG_DEBUG << "try_push_back scan task " <<
scan_task.scanner_context->ctx_id << " "
- << scan_task.priority;
- return Status::OK();
- } else {
- return Status::InternalError("failed to submit scan task to
ScanTaskQueue");
- }
-}
-bool ScanTaskQueue::try_get(ScanTask* scan_task, uint32_t timeout_ms) {
- auto r = _queue.blocking_get(scan_task, timeout_ms);
- if (r) {
- VLOG_DEBUG << "try get scan task " <<
scan_task->scanner_context->ctx_id << " "
- << scan_task->priority;
- }
- return r;
-}
-
-ScanTaskTaskGroupQueue::ScanTaskTaskGroupQueue(size_t core_size) :
_core_size(core_size) {}
-ScanTaskTaskGroupQueue::~ScanTaskTaskGroupQueue() = default;
-
-void ScanTaskTaskGroupQueue::close() {
- std::unique_lock<std::mutex> lock(_rs_mutex);
- _closed = true;
- _wait_task.notify_all();
-}
-
-bool ScanTaskTaskGroupQueue::take(ScanTask* scan_task) {
- std::unique_lock<std::mutex> lock(_rs_mutex);
- taskgroup::TGSTEntityPtr entity = nullptr;
- while (entity == nullptr) {
- if (_closed) {
- return false;
- }
- if (_group_entities.empty()) {
- _wait_task.wait_for(lock, std::chrono::milliseconds(
-
config::workload_group_scan_task_wait_timeout_ms));
- } else {
- entity = _next_tg_entity();
- if (!entity) {
- _wait_task.wait_for(lock,
- std::chrono::milliseconds(
-
config::workload_group_scan_task_wait_timeout_ms));
- }
- }
- }
- DCHECK(entity->task_size() > 0);
- if (entity->task_size() == 1) {
- _dequeue_task_group(entity);
- }
- return entity->task_queue()->try_get(
- scan_task, config::workload_group_scan_task_wait_timeout_ms /*
timeout_ms */);
-}
-
-bool ScanTaskTaskGroupQueue::push_back(ScanTask scan_task) {
- auto* entity =
scan_task.scanner_context->get_task_group()->local_scan_task_entity();
- std::unique_lock<std::mutex> lock(_rs_mutex);
- auto status = entity->task_queue()->try_push_back(scan_task);
- if (!status.ok()) {
- LOG(WARNING) << "try_push_back scan task fail: " << status;
- return false;
- }
- if (_group_entities.find(entity) == _group_entities.end()) {
- _enqueue_task_group(entity);
- }
- _wait_task.notify_one();
- return true;
-}
-
-void ScanTaskTaskGroupQueue::update_statistics(ScanTask scan_task, int64_t
time_spent) {
- auto* entity = scan_task.scan_entity;
- std::unique_lock<std::mutex> lock(_rs_mutex);
- auto find_entity = _group_entities.find(entity);
- bool is_in_queue = find_entity != _group_entities.end();
- VLOG_DEBUG << "scan task task group queue update_statistics " <<
entity->debug_string()
- << ", in queue:" << is_in_queue << ", time_spent: " <<
time_spent;
- if (is_in_queue) {
- _group_entities.erase(entity);
- }
- entity->incr_runtime_ns(time_spent);
- if (is_in_queue) {
- _group_entities.emplace(entity);
- _update_min_tg();
- }
-}
-
-void ScanTaskTaskGroupQueue::update_tg_cpu_share(const
taskgroup::TaskGroupInfo& task_group_info,
- taskgroup::TGSTEntityPtr
entity) {
- std::unique_lock<std::mutex> lock(_rs_mutex);
- bool is_in_queue = _group_entities.find(entity) != _group_entities.end();
- if (is_in_queue) {
- _group_entities.erase(entity);
- _total_cpu_share -= entity->cpu_share();
- }
- entity->check_and_update_cpu_share(task_group_info);
- if (is_in_queue) {
- _group_entities.emplace(entity);
- _total_cpu_share += entity->cpu_share();
- }
-}
-
-void ScanTaskTaskGroupQueue::_enqueue_task_group(TGSTEntityPtr tg_entity) {
- _total_cpu_share += tg_entity->cpu_share();
- // TODO llj tg If submitted back to this queue from the scanner thread,
`adjust_vruntime_ns`
- // should be avoided.
- /**
- * If a task group entity leaves task queue for a long time, its v runtime
will be very
- * small. This can cause it to preempt too many execution time. So, in
order to avoid this
- * situation, it is necessary to adjust the task group's v runtime.
- * */
- auto old_v_ns = tg_entity->vruntime_ns();
- auto* min_entity = _min_tg_entity.load();
- if (min_entity) {
- auto min_tg_v = min_entity->vruntime_ns();
- auto ideal_r = _ideal_runtime_ns(tg_entity) / 2;
- uint64_t new_vruntime_ns = min_tg_v > ideal_r ? min_tg_v - ideal_r :
min_tg_v;
- if (new_vruntime_ns > old_v_ns) {
- VLOG_DEBUG << tg_entity->debug_string() << ", adjust to new " <<
new_vruntime_ns;
- tg_entity->adjust_vruntime_ns(new_vruntime_ns);
- }
- } else if (old_v_ns < _min_tg_v_runtime_ns) {
- VLOG_DEBUG << tg_entity->debug_string() << ", adjust to " <<
_min_tg_v_runtime_ns;
- tg_entity->adjust_vruntime_ns(_min_tg_v_runtime_ns);
- }
- _group_entities.emplace(tg_entity);
- VLOG_DEBUG << "scan enqueue tg " << tg_entity->debug_string()
- << ", group entity size: " << _group_entities.size();
- _update_min_tg();
-}
-
-void ScanTaskTaskGroupQueue::_dequeue_task_group(TGSTEntityPtr tg_entity) {
- _total_cpu_share -= tg_entity->cpu_share();
- _group_entities.erase(tg_entity);
- VLOG_DEBUG << "scan task group queue dequeue tg " <<
tg_entity->debug_string()
- << ", group entity size: " << _group_entities.size();
- _update_min_tg();
-}
-
-TGSTEntityPtr ScanTaskTaskGroupQueue::_next_tg_entity() {
- taskgroup::TGSTEntityPtr res = nullptr;
- for (auto* entity : _group_entities) {
- res = entity;
- break;
- }
- return res;
-}
-
-uint64_t ScanTaskTaskGroupQueue::_ideal_runtime_ns(TGSTEntityPtr tg_entity)
const {
- // Scan task does not have time slice, so we use pipeline task's instead.
- return pipeline::PipelineTask::THREAD_TIME_SLICE * _core_size *
tg_entity->cpu_share() /
- _total_cpu_share;
-}
-
-void ScanTaskTaskGroupQueue::_update_min_tg() {
- auto* min_entity = _next_tg_entity();
- _min_tg_entity = min_entity;
- if (min_entity) {
- auto min_v_runtime = min_entity->vruntime_ns();
- if (min_v_runtime > _min_tg_v_runtime_ns) {
- _min_tg_v_runtime_ns = min_v_runtime;
- }
- }
-}
-
-bool ScanTaskTaskGroupQueue::TaskGroupSchedEntityComparator::operator()(
- const taskgroup::TGSTEntityPtr& lhs_ptr, const
taskgroup::TGSTEntityPtr& rhs_ptr) const {
- auto lhs_val = lhs_ptr->vruntime_ns();
- auto rhs_val = rhs_ptr->vruntime_ns();
- if (lhs_val != rhs_val) {
- return lhs_val < rhs_val;
- } else {
- auto l_share = lhs_ptr->cpu_share();
- auto r_share = rhs_ptr->cpu_share();
- if (l_share != r_share) {
- return l_share < r_share;
- } else {
- return lhs_ptr->task_group_id() < rhs_ptr->task_group_id();
- }
- }
-}
-
-} // namespace taskgroup
-} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/exec/scan/scan_task_queue.h
b/be/src/vec/exec/scan/scan_task_queue.h
deleted file mode 100644
index 18ef18872ed..00000000000
--- a/be/src/vec/exec/scan/scan_task_queue.h
+++ /dev/null
@@ -1,99 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#pragma once
-
-#include "olap/tablet.h"
-#include "runtime/task_group/task_group.h"
-#include "util/blocking_priority_queue.hpp"
-
-namespace doris {
-namespace vectorized {
-class ScannerContext;
-};
-
-namespace taskgroup {
-
-using WorkFunction = std::function<void()>;
-
-// Like PriorityThreadPool::Task
-struct ScanTask {
- ScanTask();
- ScanTask(WorkFunction scan_func,
std::shared_ptr<vectorized::ScannerContext> scanner_context,
- TGSTEntityPtr scan_entity, int priority);
- bool operator<(const ScanTask& o) const { return priority < o.priority; }
- ScanTask& operator++() {
- priority += 2;
- return *this;
- }
-
- WorkFunction scan_func;
- std::shared_ptr<vectorized::ScannerContext> scanner_context = nullptr;
- TGSTEntityPtr scan_entity;
- int priority;
-};
-
-// Like pipeline::PriorityTaskQueue use BlockingPriorityQueue directly?
-class ScanTaskQueue {
-public:
- ScanTaskQueue();
- Status try_push_back(ScanTask);
- bool try_get(ScanTask* scan_task, uint32_t timeout_ms);
- int size() { return _queue.get_size(); }
-
-private:
- BlockingPriorityQueue<ScanTask> _queue;
-};
-
-// Like TaskGroupTaskQueue
-class ScanTaskTaskGroupQueue {
-public:
- explicit ScanTaskTaskGroupQueue(size_t core_size);
- ~ScanTaskTaskGroupQueue();
-
- void close();
- bool take(ScanTask* scan_task);
- bool push_back(ScanTask);
-
- void update_statistics(ScanTask task, int64_t time_spent);
-
- void update_tg_cpu_share(const taskgroup::TaskGroupInfo&,
taskgroup::TGSTEntityPtr);
-
-private:
- TGSTEntityPtr _task_entity(ScanTask& scan_task);
- void _enqueue_task_group(TGSTEntityPtr);
- void _dequeue_task_group(TGSTEntityPtr);
- TGSTEntityPtr _next_tg_entity();
- uint64_t _ideal_runtime_ns(TGSTEntityPtr tg_entity) const;
- void _update_min_tg();
-
- // Like cfs rb tree in sched_entity
- struct TaskGroupSchedEntityComparator {
- bool operator()(const taskgroup::TGSTEntityPtr&, const
taskgroup::TGSTEntityPtr&) const;
- };
- using ResouceGroupSet = std::set<taskgroup::TGSTEntityPtr,
TaskGroupSchedEntityComparator>;
- ResouceGroupSet _group_entities;
- std::condition_variable _wait_task;
- std::mutex _rs_mutex;
- bool _closed = false;
- int _total_cpu_share = 0;
- std::atomic<taskgroup::TGSTEntityPtr> _min_tg_entity = nullptr;
- uint64_t _min_tg_v_runtime_ns = 0;
- size_t _core_size;
-};
-
-} // namespace taskgroup
-} // namespace doris
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index 99f645ca9e5..5ad2dbec5b6 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -601,10 +601,6 @@ void
ScannerContext::get_next_batch_of_scanners(std::list<VScannerSPtr>* current
}
}
-taskgroup::TaskGroup* ScannerContext::get_task_group() const {
- return _state->get_query_ctx()->get_task_group();
-}
-
template void ScannerContext::clear_and_join(pipeline::ScanLocalStateBase*
parent,
RuntimeState* state);
template void ScannerContext::clear_and_join(VScanNode* parent, RuntimeState*
state);
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index a64b5444712..ba9c1fdee10 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -167,7 +167,6 @@ public:
return blocks_num;
}
- taskgroup::TaskGroup* get_task_group() const;
SimplifiedScanScheduler* get_simple_scan_scheduler() { return
_simple_scan_scheduler; }
void reschedule_scanner_ctx();
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 2e4db75a241..e8d7f8a7139 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -35,7 +35,6 @@
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
-#include "scan_task_queue.h"
#include "util/async_io.h" // IWYU pragma: keep
#include "util/blocking_queue.hpp"
#include "util/cpu_info.h"
@@ -88,18 +87,15 @@ void ScannerScheduler::stop() {
_is_closed = true;
- _task_group_local_scan_queue->close();
_scheduler_pool->shutdown();
_local_scan_thread_pool->shutdown();
_remote_scan_thread_pool->shutdown();
_limited_scan_thread_pool->shutdown();
- _group_local_scan_thread_pool->shutdown();
_scheduler_pool->wait();
_local_scan_thread_pool->join();
_remote_scan_thread_pool->join();
_limited_scan_thread_pool->wait();
- _group_local_scan_thread_pool->wait();
LOG(INFO) << "ScannerScheduler stopped";
}
@@ -136,19 +132,6 @@ Status ScannerScheduler::init(ExecEnv* env) {
.set_max_threads(config::doris_scanner_thread_pool_thread_num)
.set_max_queue_size(config::doris_scanner_thread_pool_queue_size)
.build(&_limited_scan_thread_pool));
-
- // 5. task group local scan
- _task_group_local_scan_queue =
std::make_unique<taskgroup::ScanTaskTaskGroupQueue>(
- config::doris_scanner_thread_pool_thread_num);
- static_cast<void>(ThreadPoolBuilder("local_scan_group")
-
.set_min_threads(config::doris_scanner_thread_pool_thread_num)
-
.set_max_threads(config::doris_scanner_thread_pool_thread_num)
- .build(&_group_local_scan_thread_pool));
- for (int i = 0; i < config::doris_scanner_thread_pool_thread_num; i++) {
- static_cast<void>(_group_local_scan_thread_pool->submit_func([this] {
- this->_task_group_scanner_scan(this,
_task_group_local_scan_queue.get());
- }));
- }
_register_metrics();
_is_init = true;
return Status::OK();
@@ -251,13 +234,6 @@ void
ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) {
};
SimplifiedScanTask simple_scan_task = {work_func, ctx};
ret =
scan_sche->get_scan_queue()->try_put(simple_scan_task);
- } else if (ctx->get_task_group() &&
config::enable_workload_group_for_scan) {
- auto work_func = [this, scanner = *iter, ctx] {
- this->_scanner_scan(this, ctx, scanner);
- };
- taskgroup::ScanTask scan_task = {
- work_func, ctx,
ctx->get_task_group()->local_scan_task_entity(), nice};
- ret = _task_group_local_scan_queue->push_back(scan_task);
} else {
PriorityThreadPool::Task task;
task.work_function = [this, scanner = *iter, ctx] {
@@ -427,22 +403,6 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler*
scheduler,
ctx->push_back_scanner_and_reschedule(scanner);
}
-void ScannerScheduler::_task_group_scanner_scan(ScannerScheduler* scheduler,
-
taskgroup::ScanTaskTaskGroupQueue* scan_queue) {
- while (!_is_closed) {
- taskgroup::ScanTask scan_task;
- auto success = scan_queue->take(&scan_task);
- if (success) {
- int64_t time_spent = 0;
- {
- SCOPED_RAW_TIMER(&time_spent);
- scan_task.scan_func();
- }
- scan_queue->update_statistics(scan_task, time_spent);
- }
- }
-}
-
void ScannerScheduler::_register_metrics() {
REGISTER_HOOK_METRIC(local_scan_thread_pool_queue_size,
[this]() { return
_local_scan_thread_pool->get_queue_size(); });
@@ -456,10 +416,6 @@ void ScannerScheduler::_register_metrics() {
[this]() { return
_limited_scan_thread_pool->get_queue_size(); });
REGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num,
[this]() { return
_limited_scan_thread_pool->num_threads(); });
- REGISTER_HOOK_METRIC(group_local_scan_thread_pool_queue_size,
- [this]() { return
_group_local_scan_thread_pool->get_queue_size(); })
- REGISTER_HOOK_METRIC(group_local_scan_thread_pool_thread_num,
- [this]() { return
_group_local_scan_thread_pool->num_threads(); });
}
void ScannerScheduler::_deregister_metrics() {
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h
b/be/src/vec/exec/scan/scanner_scheduler.h
index 91d341613df..eb4d1380e39 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -21,7 +21,6 @@
#include <memory>
#include "common/status.h"
-#include "scan_task_queue.h"
#include "util/threadpool.h"
#include "vec/exec/scan/vscanner.h"
@@ -31,9 +30,7 @@ class ExecEnv;
namespace vectorized {
class VScanner;
} // namespace vectorized
-namespace taskgroup {
-class ScanTaskTaskGroupQueue;
-}
+
template <typename T>
class BlockingQueue;
} // namespace doris
@@ -72,9 +69,6 @@ public:
std::unique_ptr<ThreadPoolToken>
new_limited_scan_pool_token(ThreadPool::ExecutionMode mode,
int
max_concurrency);
- taskgroup::ScanTaskTaskGroupQueue* local_scan_task_queue() {
- return _task_group_local_scan_queue.get();
- }
int remote_thread_pool_max_size() const { return
_remote_thread_pool_max_size; }
@@ -87,8 +81,6 @@ private:
void _scanner_scan(ScannerScheduler* scheduler,
std::shared_ptr<ScannerContext> ctx,
VScannerSPtr scanner);
- void _task_group_scanner_scan(ScannerScheduler* scheduler,
- taskgroup::ScanTaskTaskGroupQueue*
scan_queue);
void _register_metrics();
static void _deregister_metrics();
@@ -115,9 +107,6 @@ private:
std::unique_ptr<PriorityThreadPool> _remote_scan_thread_pool;
std::unique_ptr<ThreadPool> _limited_scan_thread_pool;
- std::unique_ptr<taskgroup::ScanTaskTaskGroupQueue>
_task_group_local_scan_queue;
- std::unique_ptr<ThreadPool> _group_local_scan_thread_pool;
-
// true is the scheduler is closed.
std::atomic_bool _is_closed = {false};
bool _is_init = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]