github-actions[bot] commented on code in PR #28493:
URL: https://github.com/apache/doris/pull/28493#discussion_r1428773960
##########
be/src/vec/exec/scan/scanner_scheduler.cpp:
##########
@@ -186,14 +186,13 @@ void ScannerScheduler::_schedule_thread(int queue_id) {
}
}
-[[maybe_unused]] static void* run_scanner_bthread(void* arg) {
- auto* f = reinterpret_cast<std::function<void()>*>(arg);
- (*f)();
- delete f;
- return nullptr;
-}
-
-void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) {
+void ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx)
{
Review Comment:
warning: function '_schedule_scanners' exceeds recommended size/complexity
thresholds [readability-function-size]
```cpp
void ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext>
ctx) {
^
```
<details>
<summary>Additional context</summary>
**be/src/vec/exec/scan/scanner_scheduler.cpp:188:** 104 lines including
whitespace and comments (threshold 80)
```cpp
void ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext>
ctx) {
^
```
</details>
##########
be/src/vec/exec/scan/scanner_scheduler.cpp:
##########
@@ -227,72 +226,83 @@
// TODO(cmy): How to handle this "nice"?
int nice = 1;
auto iter = this_run.begin();
- auto submit_to_thread_pool = [&] {
- if (ctx->thread_token != nullptr) {
- // TODO llj tg how to treat this?
- while (iter != this_run.end()) {
- (*iter)->start_wait_worker_timer();
- auto s = ctx->thread_token->submit_func(
- [this, scanner = *iter, ctx] {
this->_scanner_scan(this, ctx, scanner); });
- if (s.ok()) {
- this_run.erase(iter++);
- } else {
- ctx->set_status_on_error(s);
- break;
- }
+ if (ctx->thread_token != nullptr) {
+ // TODO llj tg how to treat this?
+ while (iter != this_run.end()) {
+ (*iter)->start_wait_worker_timer();
+ auto s = ctx->thread_token->submit_func([this, scanner = *iter,
ctx] {
+ this->_scanner_scan(this, ctx, scanner);
+ ctx->signal_scanner_finished();
+ });
+ if (s.ok()) {
+ this_run.erase(iter++);
+ } else {
+ ctx->set_status_on_error(s);
+ break;
}
- } else {
- while (iter != this_run.end()) {
- (*iter)->start_wait_worker_timer();
- TabletStorageType type = (*iter)->get_storage_type();
- bool ret = false;
- if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
- if (auto* scan_sche = ctx->get_simple_scan_scheduler()) {
- auto work_func = [this, scanner = *iter, ctx] {
- this->_scanner_scan(this, ctx, scanner);
- };
- SimplifiedScanTask simple_scan_task = {work_func, ctx};
- ret =
scan_sche->get_scan_queue()->try_put(simple_scan_task);
- } else if (ctx->get_task_group() &&
config::enable_workload_group_for_scan) {
- auto work_func = [this, scanner = *iter, ctx] {
- this->_scanner_scan(this, ctx, scanner);
- };
- taskgroup::ScanTask scan_task = {
- work_func, ctx,
ctx->get_task_group()->local_scan_task_entity(),
- nice};
- ret =
_task_group_local_scan_queue->push_back(scan_task);
- } else {
- PriorityThreadPool::Task task;
- task.work_function = [this, scanner = *iter, ctx] {
- this->_scanner_scan(this, ctx, scanner);
- };
- task.priority = nice;
- ret = _local_scan_thread_pool->offer(task);
- }
+ }
+ } else {
+ while (iter != this_run.end()) {
+ (*iter)->start_wait_worker_timer();
+ TabletStorageType type = (*iter)->get_storage_type();
+ bool ret = false;
+ if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
+ if (auto* scan_sche = ctx->get_simple_scan_scheduler()) {
+ auto work_func = [this, scanner = *iter, ctx] {
+ this->_scanner_scan(this, ctx, scanner);
+ ctx->signal_scanner_finished();
+ };
+ SimplifiedScanTask simple_scan_task = {work_func, ctx};
+ ret =
scan_sche->get_scan_queue()->try_put(simple_scan_task);
+ } else if (ctx->get_task_group() &&
config::enable_workload_group_for_scan) {
+ auto work_func = [this, scanner = *iter, ctx] {
+ this->_scanner_scan(this, ctx, scanner);
+ ctx->signal_scanner_finished();
+ };
+ taskgroup::ScanTask scan_task = {
+ work_func, ctx,
ctx->get_task_group()->local_scan_task_entity(), nice};
+ ret = _task_group_local_scan_queue->push_back(scan_task);
} else {
PriorityThreadPool::Task task;
task.work_function = [this, scanner = *iter, ctx] {
this->_scanner_scan(this, ctx, scanner);
+ ctx->signal_scanner_finished();
};
task.priority = nice;
- ret = _remote_scan_thread_pool->offer(task);
- }
- if (ret) {
- this_run.erase(iter++);
- } else {
- ctx->set_status_on_error(
- Status::InternalError("failed to submit scanner to
scanner pool"));
- break;
+ ret = _local_scan_thread_pool->offer(task);
}
+ } else {
+ PriorityThreadPool::Task task;
+ task.work_function = [this, scanner = *iter, ctx] {
+ this->_scanner_scan(this, ctx, scanner);
+ ctx->signal_scanner_finished();
+ };
+ task.priority = nice;
+ ret = _remote_scan_thread_pool->offer(task);
+ }
+ if (ret) {
+ this_run.erase(iter++);
+ } else {
+ ctx->set_status_on_error(
+ Status::InternalError("failed to submit scanner to
scanner pool"));
+ break;
}
}
- };
- submit_to_thread_pool();
+ }
ctx->incr_ctx_scheduling_time(watch.elapsed_time());
}
-void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler,
ScannerContext* ctx,
- VScannerSPtr scanner) {
+void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler,
Review Comment:
warning: function '_scanner_scan' exceeds recommended size/complexity
thresholds [readability-function-size]
```cpp
void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler,
^
```
<details>
<summary>Additional context</summary>
**be/src/vec/exec/scan/scanner_scheduler.cpp:294:** 141 lines including
whitespace and comments (threshold 80)
```cpp
void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler,
^
```
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]