This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d3317aa33b [Fix](executor)Fix scan entity core #21696
d3317aa33b is described below
commit d3317aa33b9a01bb3b5ebad1e761df5ea38e747c
Author: wangbo <[email protected]>
AuthorDate: Tue Jul 11 15:56:13 2023 +0800
[Fix](executor)Fix scan entity core #21696
After the last time to call scan_task.scan_func(),the should be ended, this
means PipelineFragmentContext could be released.
Then after PipelineFragmentContext is released, visiting its field such as
query_ctx or _state may cause core dump.
But it can only explain core 2
void ScannerScheduler::_task_group_scanner_scan(ScannerScheduler* scheduler,
taskgroup::ScanTaskTaskGroupQueue* scan_queue) {
while (!_is_closed) {
taskgroup::ScanTask scan_task;
auto success = scan_queue->take(&scan_task);
if (success) {
int64_t time_spent = 0;
{
SCOPED_RAW_TIMER(&time_spent);
scan_task.scan_func();
}
scan_queue->update_statistics(scan_task, time_spent);
}
}
}
---
be/src/vec/exec/scan/scan_task_queue.cpp | 11 +++++++----
be/src/vec/exec/scan/scan_task_queue.h | 4 +++-
be/src/vec/exec/scan/scanner_scheduler.cpp | 4 +++-
3 files changed, 13 insertions(+), 6 deletions(-)
diff --git a/be/src/vec/exec/scan/scan_task_queue.cpp
b/be/src/vec/exec/scan/scan_task_queue.cpp
index 538f77211c..89235b6b7a 100644
--- a/be/src/vec/exec/scan/scan_task_queue.cpp
+++ b/be/src/vec/exec/scan/scan_task_queue.cpp
@@ -24,11 +24,14 @@
namespace doris {
namespace taskgroup {
static void empty_function() {}
-ScanTask::ScanTask() : ScanTask(empty_function, nullptr, 1) {}
+ScanTask::ScanTask() : ScanTask(empty_function, nullptr, nullptr, 1) {}
ScanTask::ScanTask(WorkFunction scan_func, vectorized::ScannerContext*
scanner_context,
- int priority)
- : scan_func(std::move(scan_func)), scanner_context(scanner_context),
priority(priority) {}
+ TGSTEntityPtr scan_entity, int priority)
+ : scan_func(std::move(scan_func)),
+ scanner_context(scanner_context),
+ scan_entity(scan_entity),
+ priority(priority) {}
ScanTaskQueue::ScanTaskQueue() :
_queue(config::doris_scanner_thread_pool_queue_size) {}
@@ -98,7 +101,7 @@ bool ScanTaskTaskGroupQueue::push_back(ScanTask scan_task) {
}
void ScanTaskTaskGroupQueue::update_statistics(ScanTask scan_task, int64_t
time_spent) {
- auto* entity =
scan_task.scanner_context->get_task_group()->local_scan_task_entity();
+ auto* entity = scan_task.scan_entity;
std::unique_lock<std::mutex> lock(_rs_mutex);
auto find_entity = _group_entities.find(entity);
bool is_in_queue = find_entity != _group_entities.end();
diff --git a/be/src/vec/exec/scan/scan_task_queue.h
b/be/src/vec/exec/scan/scan_task_queue.h
index f3c3b792a4..c694859e3c 100644
--- a/be/src/vec/exec/scan/scan_task_queue.h
+++ b/be/src/vec/exec/scan/scan_task_queue.h
@@ -33,7 +33,8 @@ static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100;
// Like PriorityThreadPool::Task
struct ScanTask {
ScanTask();
- ScanTask(WorkFunction scan_func, vectorized::ScannerContext*
scanner_context, int priority);
+ ScanTask(WorkFunction scan_func, vectorized::ScannerContext*
scanner_context,
+ TGSTEntityPtr scan_entity, int priority);
bool operator<(const ScanTask& o) const { return priority < o.priority; }
ScanTask& operator++() {
priority += 2;
@@ -42,6 +43,7 @@ struct ScanTask {
WorkFunction scan_func;
vectorized::ScannerContext* scanner_context;
+ TGSTEntityPtr scan_entity;
int priority;
};
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index a798d097a2..c37760167f 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -226,7 +226,9 @@ void ScannerScheduler::_schedule_scanners(ScannerContext*
ctx) {
auto work_func = [this, scanner = *iter, ctx] {
this->_scanner_scan(this, ctx, scanner);
};
- taskgroup::ScanTask scan_task = {work_func, ctx, nice};
+ taskgroup::ScanTask scan_task = {
+ work_func, ctx,
ctx->get_task_group()->local_scan_task_entity(),
+ nice};
ret =
_task_group_local_scan_queue->push_back(scan_task);
} else {
PriorityThreadPool::Task task;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]