This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new d4249e4 [Bug] fix Runtime filter can't find fragment-id when
apply_filter called early (#6923)
d4249e4 is described below
commit d4249e4f2dddd13ca6db472d772a7b2c5146be40
Author: Pxl <[email protected]>
AuthorDate: Wed Oct 27 09:54:52 2021 +0800
[Bug] fix Runtime filter can't find fragment-id when apply_filter called
early (#6923)
#6921
---
be/src/runtime/fragment_mgr.cpp | 14 +++++++++++---
be/src/runtime/fragment_mgr.h | 2 ++
2 files changed, 13 insertions(+), 3 deletions(-)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index f980e7b..a8ff209 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -610,6 +610,7 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params, Fi
{
std::lock_guard<std::mutex> lock(_lock);
_fragment_map.insert(std::make_pair(params.params.fragment_instance_id,
exec_state));
+ _cv.notify_all();
}
auto st = _thread_pool->submit_func(
@@ -747,7 +748,7 @@ Status FragmentMgr::exec_external_plan_fragment(const
TScanOpenParams& params,
}
VLOG_QUERY << "BackendService execute open() TQueryPlanInfo: "
- << apache::thrift::ThriftDebugString(t_query_plan_info);
+ << apache::thrift::ThriftDebugString(t_query_plan_info);
// assign the param used to execute PlanFragment
TExecPlanFragmentParams exec_fragment_params;
exec_fragment_params.protocol_version =
(PaloInternalServiceVersion::type)0;
@@ -811,14 +812,21 @@ Status FragmentMgr::apply_filter(const
PPublishFilterRequest* request, const cha
std::shared_ptr<FragmentExecState> fragment_state;
{
- std::lock_guard<std::mutex> lock(_lock);
+ std::unique_lock<std::mutex> lock(_lock);
+ if (!_fragment_map.count(tfragment_instance_id)) {
+ VLOG_NOTICE << "wait for fragment start execute, fragment-id:" <<
fragment_instance_id;
+ _cv.wait_for(lock, std::chrono::milliseconds(1000),
+ [&] { return
_fragment_map.count(tfragment_instance_id); });
+ }
+
auto iter = _fragment_map.find(tfragment_instance_id);
if (iter == _fragment_map.end()) {
- LOG(WARNING) << "unknown.... fragment-id:" << fragment_instance_id;
+ VLOG_CRITICAL << "unknown.... fragment-id:" <<
fragment_instance_id;
return Status::InvalidArgument("fragment-id");
}
fragment_state = iter->second;
}
+
DCHECK(fragment_state != nullptr);
RuntimeFilterMgr* runtime_filter_mgr =
fragment_state->executor()->runtime_state()->runtime_filter_mgr();
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 70233e1..ba56216 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -99,6 +99,8 @@ private:
std::mutex _lock;
+ std::condition_variable _cv;
+
// Make sure that remove this before no data reference FragmentExecState
std::unordered_map<TUniqueId, std::shared_ptr<FragmentExecState>>
_fragment_map;
// query id -> QueryFragmentsCtx
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]