This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 738abac9eda [minor](context) duplicate query context in fragment ctx
(#29364)
738abac9eda is described below
commit 738abac9eda3e39bf4ac5c2f4996b3a6a67b2919
Author: yiguolei <[email protected]>
AuthorDate: Mon Jan 1 22:08:23 2024 +0800
[minor](context) duplicate query context in fragment ctx (#29364)
---------
Co-authored-by: yiguolei <[email protected]>
---
be/src/pipeline/pipeline_fragment_context.h | 4 +---
be/src/pipeline/pipeline_task.cpp | 2 +-
be/src/pipeline/task_scheduler.cpp | 4 ++--
be/src/runtime/exec_env_init.cpp | 7 ++++---
be/src/runtime/fragment_mgr.cpp | 4 ++--
be/src/vec/exec/scan/new_olap_scanner.cpp | 1 -
6 files changed, 10 insertions(+), 12 deletions(-)
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 5009bde9b20..2a3a11d59cc 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -83,7 +83,7 @@ public:
return _runtime_state->runtime_filter_mgr();
}
- QueryContext* get_query_ctx() { return _runtime_state->get_query_ctx(); }
+ QueryContext* get_query_ctx() { return _query_ctx.get(); }
// should be protected by lock?
[[nodiscard]] bool is_canceled() const { return
_runtime_state->is_cancelled(); }
@@ -108,8 +108,6 @@ public:
// TODO: Support pipeline runtime filter
- QueryContext* get_query_context() { return _query_ctx.get(); }
-
TUniqueId get_query_id() const { return _query_id; }
[[nodiscard]] int get_fragment_id() const { return _fragment_id; }
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 75694373a4c..5f5ff56aa4b 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -357,7 +357,7 @@ Status PipelineTask::close(Status exec_status) {
}
QueryContext* PipelineTask::query_context() {
- return _fragment_context->get_query_context();
+ return _fragment_context->get_query_ctx();
}
// The FSM see PipelineTaskState's comment
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index e0e668acbd9..e814e4cdf2d 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -247,7 +247,7 @@ void TaskScheduler::_do_work(size_t index) {
if (state == PipelineTaskState::PENDING_FINISH) {
DCHECK(task->is_pipelineX() || !task->is_pending_finish())
<< "must not pending close " << task->debug_string();
- Status exec_status =
fragment_ctx->get_query_context()->exec_status();
+ Status exec_status = fragment_ctx->get_query_ctx()->exec_status();
_try_close_task(task,
canceled ? PipelineTaskState::CANCELED :
PipelineTaskState::FINISHED,
exec_status);
@@ -264,7 +264,7 @@ void TaskScheduler::_do_work(size_t index) {
// If pipeline is canceled, it will report after pipeline closed,
and will propagate
// errors to downstream through exchange. So, here we needn't
send_report.
// fragment_ctx->send_report(true);
- Status cancel_status =
fragment_ctx->get_query_context()->exec_status();
+ Status cancel_status =
fragment_ctx->get_query_ctx()->exec_status();
_try_close_task(task, PipelineTaskState::CANCELED, cancel_status);
continue;
}
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 508ea7c7921..2a1233c9b5d 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -588,9 +588,6 @@ void ExecEnv::destroy() {
SAFE_DELETE(_scanner_scheduler);
// _storage_page_cache must be destoried before _cache_manager
SAFE_DELETE(_storage_page_cache);
- // cache_manager must be destoried after _inverted_index_query_cache
- // https://github.com/apache/doris/issues/24082#issuecomment-1712544039
- SAFE_DELETE(_cache_manager);
SAFE_DELETE(_small_file_mgr);
SAFE_DELETE(_broker_mgr);
@@ -629,6 +626,10 @@ void ExecEnv::destroy() {
SAFE_DELETE(_external_scan_context_mgr);
SAFE_DELETE(_user_function_cache);
+ // cache_manager must be destoried after _inverted_index_query_cache
+ // https://github.com/apache/doris/issues/24082#issuecomment-1712544039
+ SAFE_DELETE(_cache_manager);
+
// _heartbeat_flags must be destoried after staroge engine
SAFE_DELETE(_heartbeat_flags);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 9256c3bccfd..d265b5e35b5 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -576,7 +576,7 @@ void FragmentMgr::remove_pipeline_context(
std::shared_ptr<pipeline::PipelineFragmentContext> f_context) {
std::lock_guard<std::mutex> lock(_lock);
auto query_id = f_context->get_query_id();
- auto* q_context = f_context->get_query_context();
+ auto* q_context = f_context->get_query_ctx();
std::vector<TUniqueId> ins_ids;
f_context->instance_ids(ins_ids);
bool all_done = q_context->countdown(ins_ids.size());
@@ -1413,7 +1413,7 @@ Status FragmentMgr::apply_filterv2(const
PPublishFilterRequestV2* request,
DCHECK(pip_context != nullptr);
runtime_filter_mgr =
pip_context->get_query_ctx()->runtime_filter_mgr();
- pool = &pip_context->get_query_context()->obj_pool;
+ pool = &pip_context->get_query_ctx()->obj_pool;
} else {
std::unique_lock<std::mutex> lock(_lock);
auto iter = _fragment_instance_map.find(tfragment_instance_id);
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 5566abec3ab..bc15cf7207f 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -524,7 +524,6 @@ Status NewOlapScanner::close(RuntimeState* state) {
// so that it will core
_tablet_reader_params.rs_splits.clear();
_tablet_reader.reset();
- LOG(INFO) << "close_tablet_id" <<
_tablet_reader_params.tablet->tablet_id();
RETURN_IF_ERROR(VScanner::close(state));
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]