This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 1b47296153e [Refactor](query) refactor lock in fragment mgr and change
std::unorder_map to phmap (#45058)
1b47296153e is described below
commit 1b47296153ee7e954bf3ee2dfd926be1ab3ee204
Author: HappenLee <[email protected]>
AuthorDate: Tue Dec 10 15:39:09 2024 +0800
[Refactor](query) refactor lock in fragment mgr and change std::unorder_map
to phmap (#45058)
### What problem does this PR solve?
Cherry pick #44821
Related PR: #44821
---
be/src/runtime/fragment_mgr.cpp | 249 ++++++++++++++++++---------------------
be/src/runtime/fragment_mgr.h | 32 +++--
be/src/runtime/load_channel.cpp | 3 +-
be/src/runtime/load_stream.cpp | 2 +-
be/src/runtime/runtime_state.cpp | 2 +-
5 files changed, 138 insertions(+), 150 deletions(-)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 337a7aa41fc..0f29d1de6a6 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -267,8 +267,11 @@ void FragmentMgr::stop() {
// Only me can delete
{
- std::lock_guard<std::mutex> lock(_lock);
+ std::unique_lock lock(_query_ctx_map_mutex);
_query_ctx_map.clear();
+ }
+ {
+ std::unique_lock lock(_pipeline_map_mutex);
_pipeline_map.clear();
}
_thread_pool->shutdown();
@@ -620,11 +623,7 @@ Status FragmentMgr::start_query_execution(const
PExecPlanFragmentStartRequest* r
TUniqueId query_id;
query_id.__set_hi(request->query_id().hi());
query_id.__set_lo(request->query_id().lo());
- std::shared_ptr<QueryContext> q_ctx = nullptr;
- {
- std::lock_guard<std::mutex> lock(_lock);
- q_ctx = _get_or_erase_query_ctx(query_id);
- }
+ auto q_ctx = get_query_ctx(query_id);
if (q_ctx) {
q_ctx->set_ready_to_execute(Status::OK());
LOG_INFO("Query {} start execution", print_id(query_id));
@@ -639,116 +638,110 @@ Status FragmentMgr::start_query_execution(const
PExecPlanFragmentStartRequest* r
void FragmentMgr::remove_pipeline_context(
std::shared_ptr<pipeline::PipelineFragmentContext> f_context) {
- {
- std::lock_guard<std::mutex> lock(_lock);
- auto query_id = f_context->get_query_id();
- int64 now = duration_cast<std::chrono::milliseconds>(
-
std::chrono::system_clock::now().time_since_epoch())
- .count();
- g_fragment_executing_count << -1;
- g_fragment_last_active_time.set_value(now);
- // this log will show when a query is really finished in BEs
- LOG_INFO("Removing query {} fragment {}", print_id(query_id),
f_context->get_fragment_id());
- _pipeline_map.erase({query_id, f_context->get_fragment_id()});
- }
+ auto query_id = f_context->get_query_id();
+ int64 now = duration_cast<std::chrono::milliseconds>(
+ std::chrono::system_clock::now().time_since_epoch())
+ .count();
+ g_fragment_executing_count << -1;
+ g_fragment_last_active_time.set_value(now);
+
+ // this log will show when a query is really finished in BEs
+ LOG_INFO("Removing query {} fragment {}", print_id(query_id),
f_context->get_fragment_id());
+
+ std::unique_lock lock(_pipeline_map_mutex);
+ _pipeline_map.erase({query_id, f_context->get_fragment_id()});
}
-std::shared_ptr<QueryContext> FragmentMgr::_get_or_erase_query_ctx(const
TUniqueId& query_id) {
+std::shared_ptr<QueryContext> FragmentMgr::get_query_ctx(const TUniqueId&
query_id) {
+ std::shared_lock lock(_query_ctx_map_mutex);
auto search = _query_ctx_map.find(query_id);
if (search != _query_ctx_map.end()) {
if (auto q_ctx = search->second.lock()) {
return q_ctx;
- } else {
- LOG(WARNING) << "Query context (query id = " << print_id(query_id)
- << ") has been released.";
- _query_ctx_map.erase(search);
- return nullptr;
}
}
return nullptr;
}
-std::shared_ptr<QueryContext> FragmentMgr::get_or_erase_query_ctx_with_lock(
- const TUniqueId& query_id) {
- std::unique_lock<std::mutex> lock(_lock);
- return _get_or_erase_query_ctx(query_id);
-}
-
-template <typename Params>
-Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id,
bool pipeline,
- QuerySource query_source,
- std::shared_ptr<QueryContext>& query_ctx) {
+Status FragmentMgr::_get_or_create_query_ctx(const TPipelineFragmentParams&
params,
+ TUniqueId query_id, bool pipeline,
+ QuerySource query_source,
+ std::shared_ptr<QueryContext>&
query_ctx) {
DBUG_EXECUTE_IF("FragmentMgr._get_query_ctx.failed", {
return Status::InternalError("FragmentMgr._get_query_ctx.failed, query
id {}",
print_id(query_id));
});
+
+ // Find _query_ctx_map, in case some other request has already
+ // create the query fragments context.
+ query_ctx = get_query_ctx(query_id);
if (params.is_simplified_param) {
// Get common components from _query_ctx_map
- std::lock_guard<std::mutex> lock(_lock);
- if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
- query_ctx = q_ctx;
- } else {
+ if (!query_ctx) {
return Status::InternalError(
"Failed to get query fragments context. Query {} may be
timeout or be "
"cancelled. host: {}",
print_id(query_id), BackendOptions::get_localhost());
}
} else {
- // Find _query_ctx_map, in case some other request has already
- // create the query fragments context.
- std::lock_guard<std::mutex> lock(_lock);
- if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
- query_ctx = q_ctx;
- return Status::OK();
- }
+ if (!query_ctx) {
+ std::unique_lock lock(_query_ctx_map_mutex);
+ // Only one thread need create query ctx. other thread just get
query_ctx in _query_ctx_map.
+ auto search = _query_ctx_map.find(query_id);
+ if (search != _query_ctx_map.end()) {
+ query_ctx = search->second.lock();
+ }
- // First time a fragment of a query arrived. print logs.
- LOG(INFO) << "query_id: " << print_id(query_id) << ", coord_addr: " <<
params.coord
- << ", total fragment num on current host: " <<
params.fragment_num_on_host
- << ", fe process uuid: " <<
params.query_options.fe_process_uuid
- << ", query type: " << params.query_options.query_type
- << ", report audit fe:" << params.current_connect_fe;
-
- // This may be a first fragment request of the query.
- // Create the query fragments context.
- query_ctx = QueryContext::create_shared(query_id, _exec_env,
params.query_options,
- params.coord, pipeline,
params.is_nereids,
- params.current_connect_fe,
query_source);
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker);
- RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool),
params.desc_tbl,
- &(query_ctx->desc_tbl)));
- // set file scan range params
- if (params.__isset.file_scan_params) {
- query_ctx->file_scan_range_params_map = params.file_scan_params;
- }
+ if (!query_ctx) {
+ // First time a fragment of a query arrived. print logs.
+ LOG(INFO) << "query_id: " << print_id(query_id) << ",
coord_addr: " << params.coord
+ << ", total fragment num on current host: " <<
params.fragment_num_on_host
+ << ", fe process uuid: " <<
params.query_options.fe_process_uuid
+ << ", query type: " <<
params.query_options.query_type
+ << ", report audit fe:" << params.current_connect_fe;
+
+ // This may be a first fragment request of the query.
+ // Create the query fragments context.
+ query_ctx = QueryContext::create_shared(query_id, _exec_env,
params.query_options,
+ params.coord,
pipeline, params.is_nereids,
+
params.current_connect_fe, query_source);
+
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker);
+ RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool),
params.desc_tbl,
+ &(query_ctx->desc_tbl)));
+ // set file scan range params
+ if (params.__isset.file_scan_params) {
+ query_ctx->file_scan_range_params_map =
params.file_scan_params;
+ }
- query_ctx->query_globals = params.query_globals;
+ query_ctx->query_globals = params.query_globals;
- if (params.__isset.resource_info) {
- query_ctx->user = params.resource_info.user;
- query_ctx->group = params.resource_info.group;
- query_ctx->set_rsc_info = true;
- }
+ if (params.__isset.resource_info) {
+ query_ctx->user = params.resource_info.user;
+ query_ctx->group = params.resource_info.group;
+ query_ctx->set_rsc_info = true;
+ }
- _set_scan_concurrency(params, query_ctx.get());
-
- if (params.__isset.workload_groups && !params.workload_groups.empty())
{
- uint64_t tg_id = params.workload_groups[0].id;
- WorkloadGroupPtr workload_group_ptr =
-
_exec_env->workload_group_mgr()->get_task_group_by_id(tg_id);
- if (workload_group_ptr != nullptr) {
- RETURN_IF_ERROR(workload_group_ptr->add_query(query_id,
query_ctx));
-
RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr));
-
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(print_id(query_id),
-
tg_id);
- } else {
- LOG(WARNING) << "Query/load id: " <<
print_id(query_ctx->query_id())
- << "can't find its workload group " << tg_id;
+ _set_scan_concurrency(params, query_ctx.get());
+
+ if (params.__isset.workload_groups &&
!params.workload_groups.empty()) {
+ uint64_t tg_id = params.workload_groups[0].id;
+ WorkloadGroupPtr workload_group_ptr =
+
_exec_env->workload_group_mgr()->get_task_group_by_id(tg_id);
+ if (workload_group_ptr != nullptr) {
+
RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, query_ctx));
+
RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr));
+
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(
+ print_id(query_id), tg_id);
+ } else {
+ LOG(WARNING) << "Query/load id: " <<
print_id(query_ctx->query_id())
+ << "can't find its workload group " <<
tg_id;
+ }
+ }
+ // There is some logic in query ctx's dctor, we could not
check if exists and delete the
+ // temp query ctx now. For example, the query id maybe removed
from workload group's queryset.
+ _query_ctx_map.insert({query_id, query_ctx});
}
}
- // There is some logic in query ctx's dctor, we could not check if
exists and delete the
- // temp query ctx now. For example, the query id maybe removed from
workload group's queryset.
- _query_ctx_map.insert(std::make_pair(query_ctx->query_id(),
query_ctx));
}
return Status::OK();
}
@@ -762,13 +755,13 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t
duration) {
fmt::memory_buffer debug_string_buffer;
size_t i = 0;
{
- std::lock_guard<std::mutex> lock(_lock);
fmt::format_to(debug_string_buffer,
"{} pipeline fragment contexts are still running!
duration_limit={}\n",
_pipeline_map.size(), duration);
-
timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
+
+ std::shared_lock lock(_pipeline_map_mutex);
for (auto& it : _pipeline_map) {
auto elapsed = it.second->elapsed_time() / 1000000000.0;
if (elapsed < duration) {
@@ -787,7 +780,7 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t
duration) {
}
std::string FragmentMgr::dump_pipeline_tasks(TUniqueId& query_id) {
- if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
+ if (auto q_ctx = get_query_ctx(query_id)) {
return q_ctx->print_all_pipeline_context();
} else {
return fmt::format(
@@ -806,7 +799,8 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
<<
apache::thrift::ThriftDebugString(params.query_options).c_str();
std::shared_ptr<QueryContext> query_ctx;
- RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true,
query_source, query_ctx));
+ RETURN_IF_ERROR(
+ _get_or_create_query_ctx(params, params.query_id, true,
query_source, query_ctx));
SCOPED_ATTACH_TASK(query_ctx.get());
int64_t duration_ns = 0;
std::shared_ptr<pipeline::PipelineFragmentContext> context =
@@ -839,16 +833,8 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
}
{
- // (query_id, fragment_id) is executed only on one BE, locks
_pipeline_map.
- std::lock_guard<std::mutex> lock(_lock);
for (const auto& local_param : params.local_params) {
const TUniqueId& fragment_instance_id =
local_param.fragment_instance_id;
- auto iter = _pipeline_map.find({params.query_id,
params.fragment_id});
- if (iter != _pipeline_map.end()) {
- return Status::InternalError(
- "exec_plan_fragment query_id({}) input duplicated
fragment_id({})",
- print_id(params.query_id), params.fragment_id);
- }
query_ctx->fragment_instance_ids.push_back(fragment_instance_id);
}
@@ -857,7 +843,15 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
.count();
g_fragment_executing_count << 1;
g_fragment_last_active_time.set_value(now);
- // TODO: simplify this mapping
+
+ // (query_id, fragment_id) is executed only on one BE, locks
_pipeline_map.
+ std::unique_lock lock(_pipeline_map_mutex);
+ auto iter = _pipeline_map.find({params.query_id, params.fragment_id});
+ if (iter != _pipeline_map.end()) {
+ return Status::InternalError(
+ "exec_plan_fragment query_id({}) input duplicated
fragment_id({})",
+ print_id(params.query_id), params.fragment_id);
+ }
_pipeline_map.insert({{params.query_id, params.fragment_id}, context});
}
@@ -887,8 +881,7 @@ void FragmentMgr::cancel_query(const TUniqueId query_id,
const Status reason) {
std::shared_ptr<QueryContext> query_ctx = nullptr;
std::vector<TUniqueId> all_instance_ids;
{
- std::lock_guard<std::mutex> state_lock(_lock);
- if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
+ if (auto q_ctx = get_query_ctx(query_id)) {
query_ctx = q_ctx;
// Copy instanceids to avoid concurrent modification.
// And to reduce the scope of lock.
@@ -901,7 +894,7 @@ void FragmentMgr::cancel_query(const TUniqueId query_id,
const Status reason) {
}
query_ctx->cancel(reason);
{
- std::lock_guard<std::mutex> state_lock(_lock);
+ std::unique_lock l(_query_ctx_map_mutex);
_query_ctx_map.erase(query_id);
}
LOG(INFO) << "Query " << print_id(query_id)
@@ -937,7 +930,7 @@ void FragmentMgr::cancel_worker() {
std::vector<std::shared_ptr<pipeline::PipelineFragmentContext>> ctx;
{
- std::lock_guard<std::mutex> lock(_lock);
+ std::shared_lock lock(_pipeline_map_mutex);
ctx.reserve(_pipeline_map.size());
for (auto& pipeline_itr : _pipeline_map) {
ctx.push_back(pipeline_itr.second);
@@ -948,21 +941,24 @@ void FragmentMgr::cancel_worker() {
}
{
- std::lock_guard<std::mutex> lock(_lock);
- for (auto it = _query_ctx_map.begin(); it !=
_query_ctx_map.end();) {
- if (auto q_ctx = it->second.lock()) {
- if (q_ctx->is_timeout(now)) {
- LOG_WARNING("Query {} is timeout",
print_id(it->first));
- queries_timeout.push_back(it->first);
+ {
+ // TODO: Now only the cancel worker do the GC the
_query_ctx_map. each query must
+ // do erase the finish query unless in _query_ctx_map. Rethink
the logic is ok
+ std::unique_lock lock(_query_ctx_map_mutex);
+ for (auto it = _query_ctx_map.begin(); it !=
_query_ctx_map.end();) {
+ if (auto q_ctx = it->second.lock()) {
+ if (q_ctx->is_timeout(now)) {
+ LOG_WARNING("Query {} is timeout",
print_id(it->first));
+ queries_timeout.push_back(it->first);
+ }
++it;
} else {
- ++it;
+ it = _query_ctx_map.erase(it);
}
- } else {
- it = _query_ctx_map.erase(it);
}
}
+ std::shared_lock lock(_query_ctx_map_mutex);
// We use a very conservative cancel strategy.
// 0. If there are no running frontends, do not cancel any queries.
// 1. If query's process uuid is zero, do not cancel
@@ -1186,7 +1182,7 @@ Status FragmentMgr::apply_filterv2(const
PPublishFilterRequestV2* request,
const auto& fragment_ids = request->fragment_ids();
{
- std::unique_lock<std::mutex> lock(_lock);
+ std::shared_lock lock(_pipeline_map_mutex);
for (auto fragment_id : fragment_ids) {
if (is_pipeline) {
auto iter = _pipeline_map.find(
@@ -1242,8 +1238,7 @@ Status FragmentMgr::send_filter_size(const
PSendFilterSizeRequest* request) {
TUniqueId query_id;
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
- std::lock_guard<std::mutex> lock(_lock);
- if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
+ if (auto q_ctx = get_query_ctx(query_id)) {
query_ctx = q_ctx;
} else {
return Status::EndOfFile(
@@ -1266,8 +1261,7 @@ Status FragmentMgr::sync_filter_size(const
PSyncFilterSizeRequest* request) {
TUniqueId query_id;
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
- std::lock_guard<std::mutex> lock(_lock);
- if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
+ if (auto q_ctx = get_query_ctx(query_id)) {
query_ctx = q_ctx;
} else {
return Status::EndOfFile(
@@ -1287,8 +1281,7 @@ Status FragmentMgr::merge_filter(const
PMergeFilterRequest* request,
TUniqueId query_id;
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
- std::lock_guard<std::mutex> lock(_lock);
- if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
+ if (auto q_ctx = get_query_ctx(query_id)) {
query_ctx = q_ctx;
} else {
return Status::EndOfFile(
@@ -1305,7 +1298,7 @@ Status FragmentMgr::merge_filter(const
PMergeFilterRequest* request,
void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>*
query_info_list) {
{
- std::lock_guard<std::mutex> lock(_lock);
+ std::unique_lock lock(_query_ctx_map_mutex);
for (auto iter = _query_ctx_map.begin(); iter !=
_query_ctx_map.end();) {
if (auto q_ctx = iter->second.lock()) {
WorkloadQueryInfo workload_query_info;
@@ -1328,19 +1321,9 @@ Status FragmentMgr::get_realtime_exec_status(const
TUniqueId& query_id,
return Status::InvalidArgument("exes_status is nullptr");
}
- std::shared_ptr<QueryContext> query_context = nullptr;
-
- {
- std::lock_guard<std::mutex> lock(_lock);
- if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
- query_context = q_ctx;
- } else {
- return Status::NotFound("Query {} has been released",
print_id(query_id));
- }
- }
-
+ std::shared_ptr<QueryContext> query_context = get_query_ctx(query_id);
if (query_context == nullptr) {
- return Status::NotFound("Query {} not found", print_id(query_id));
+ return Status::NotFound("Query {} not found or released",
print_id(query_id));
}
*exec_status = query_context->get_realtime_exec_status();
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 20b2fd8cdc2..41ea67844e0 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -133,7 +133,7 @@ public:
ThreadPool* get_thread_pool() { return _thread_pool.get(); }
int32_t running_query_num() {
- std::unique_lock<std::mutex> ctx_lock(_lock);
+ std::shared_lock lock(_query_ctx_map_mutex);
return _query_ctx_map.size();
}
@@ -145,35 +145,41 @@ public:
Status get_realtime_exec_status(const TUniqueId& query_id,
TReportExecStatusParams* exec_status);
- std::shared_ptr<QueryContext> get_or_erase_query_ctx_with_lock(const
TUniqueId& query_id);
+ std::shared_ptr<QueryContext> get_query_ctx(const TUniqueId& query_id);
private:
std::shared_ptr<QueryContext> _get_or_erase_query_ctx(const TUniqueId&
query_id);
+ struct BrpcItem {
+ TNetworkAddress network_address;
+ std::vector<std::weak_ptr<QueryContext>> queries;
+ };
+
template <typename Param>
void _set_scan_concurrency(const Param& params, QueryContext* query_ctx);
- template <typename Params>
- Status _get_query_ctx(const Params& params, TUniqueId query_id, bool
pipeline,
- QuerySource query_type,
std::shared_ptr<QueryContext>& query_ctx);
+ Status _get_or_create_query_ctx(const TPipelineFragmentParams& params,
TUniqueId query_id,
+ bool pipeline, QuerySource query_type,
+ std::shared_ptr<QueryContext>& query_ctx);
// This is input params
ExecEnv* _exec_env = nullptr;
+ // The lock protect the `_pipeline_map`
+ std::shared_mutex _pipeline_map_mutex;
+ // (QueryID, FragmentID) -> PipelineFragmentContext
+ phmap::flat_hash_map<std::pair<TUniqueId, int>,
+ std::shared_ptr<pipeline::PipelineFragmentContext>>
+ _pipeline_map;
+
// The lock should only be used to protect the structures in fragment
manager. Has to be
// used in a very small scope because it may dead lock. For example, if
the _lock is used
// in prepare stage, the call path is prepare --> expr prepare --> may
call allocator
// when allocate failed, allocator may call query_is_cancelled, query is
callced will also
// call _lock, so that there is dead lock.
- std::mutex _lock;
-
- // (QueryID, FragmentID) -> PipelineFragmentContext
- std::unordered_map<std::pair<TUniqueId, int>,
- std::shared_ptr<pipeline::PipelineFragmentContext>>
- _pipeline_map;
-
+ std::shared_mutex _query_ctx_map_mutex;
// query id -> QueryContext
- std::unordered_map<TUniqueId, std::weak_ptr<QueryContext>> _query_ctx_map;
+ phmap::flat_hash_map<TUniqueId, std::weak_ptr<QueryContext>>
_query_ctx_map;
std::unordered_map<TUniqueId, std::unordered_map<int, int64_t>>
_bf_size_map;
CountDownLatch _stop_background_threads_latch;
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 9369c0c833c..4ff83ff93df 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -45,8 +45,7 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t
timeout_s, bool is_hig
_backend_id(backend_id),
_enable_profile(enable_profile) {
std::shared_ptr<QueryContext> query_context =
-
ExecEnv::GetInstance()->fragment_mgr()->get_or_erase_query_ctx_with_lock(
- _load_id.to_thrift());
+
ExecEnv::GetInstance()->fragment_mgr()->get_query_ctx(_load_id.to_thrift());
std::shared_ptr<MemTrackerLimiter> mem_tracker = nullptr;
WorkloadGroupPtr wg_ptr = nullptr;
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 752e2ff95b2..60da45fa685 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -428,7 +428,7 @@ LoadStream::LoadStream(PUniqueId load_id, LoadStreamMgr*
load_stream_mgr, bool e
TUniqueId load_tid = ((UniqueId)load_id).to_thrift();
#ifndef BE_TEST
std::shared_ptr<QueryContext> query_context =
-
ExecEnv::GetInstance()->fragment_mgr()->get_or_erase_query_ctx_with_lock(load_tid);
+ ExecEnv::GetInstance()->fragment_mgr()->get_query_ctx(load_tid);
if (query_context != nullptr) {
_query_thread_context = {load_tid, query_context->query_mem_tracker,
query_context->workload_group()};
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 4f24824ac70..34d9be9bcb2 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -299,7 +299,7 @@ Status RuntimeState::init(const TUniqueId&
fragment_instance_id, const TQueryOpt
}
std::weak_ptr<QueryContext> RuntimeState::get_query_ctx_weak() {
- return
_exec_env->fragment_mgr()->get_or_erase_query_ctx_with_lock(_query_ctx->query_id());
+ return _exec_env->fragment_mgr()->get_query_ctx(_query_ctx->query_id());
}
void RuntimeState::init_mem_trackers(const std::string& name, const TUniqueId&
id) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]