This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4649faf2a84 [fix](load) Fix wrong results for high-concurrent loading
(#36841)
4649faf2a84 is described below
commit 4649faf2a841d3d421ee48640770ed1f4e764dbf
Author: Gabriel <[email protected]>
AuthorDate: Wed Jun 26 19:41:42 2024 +0800
[fix](load) Fix wrong results for high-concurrent loading (#36841)
---
be/src/runtime/group_commit_mgr.cpp | 31 +++++++++++++++++++------------
be/src/runtime/group_commit_mgr.h | 1 +
2 files changed, 20 insertions(+), 12 deletions(-)
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index ab11b795ed5..111780c9a42 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -287,18 +287,25 @@ Status GroupCommitTable::get_first_block_load_queue(
if (!_is_creating_plan_fragment) {
_is_creating_plan_fragment = true;
create_plan_dep->block();
- RETURN_IF_ERROR(
- _thread_pool->submit_func([&, be_exe_version, mem_tracker, dep
= create_plan_dep] {
- Defer defer {[&, dep = dep]() {
- dep->set_ready();
- std::unique_lock l(_lock);
- _is_creating_plan_fragment = false;
- }};
- auto st = _create_group_commit_load(be_exe_version,
mem_tracker);
- if (!st.ok()) {
- LOG(WARNING) << "create group commit load error, st="
<< st.to_string();
- }
- }));
+ RETURN_IF_ERROR(_thread_pool->submit_func([&, be_exe_version,
mem_tracker,
+ dep = create_plan_dep] {
+ Defer defer {[&, dep = dep]() {
+ dep->set_ready();
+ std::unique_lock l(_lock);
+ for (auto it : _create_plan_deps) {
+ it->set_ready();
+ }
+ std::vector<std::shared_ptr<pipeline::Dependency>>
{}.swap(_create_plan_deps);
+ _is_creating_plan_fragment = false;
+ }};
+ auto st = _create_group_commit_load(be_exe_version, mem_tracker);
+ if (!st.ok()) {
+ LOG(WARNING) << "create group commit load error, st=" <<
st.to_string();
+ }
+ }));
+ } else {
+ create_plan_dep->block();
+ _create_plan_deps.push_back(create_plan_dep);
}
return try_to_get_matched_queue();
}
diff --git a/be/src/runtime/group_commit_mgr.h
b/be/src/runtime/group_commit_mgr.h
index 702ebb9c746..c668197e8dc 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -187,6 +187,7 @@ private:
// fragment_instance_id to load_block_queue
std::unordered_map<UniqueId, std::shared_ptr<LoadBlockQueue>>
_load_block_queues;
bool _is_creating_plan_fragment = false;
+ std::vector<std::shared_ptr<pipeline::Dependency>> _create_plan_deps;
};
class GroupCommitMgr {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]