This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new da71c0cb940 [fix](pipeline) Prevent concurrent accessing to
dependencies (#35560)
da71c0cb940 is described below
commit da71c0cb940ececebb7129ab9f56b3dfae6a6b9a
Author: Gabriel <[email protected]>
AuthorDate: Wed May 29 12:39:03 2024 +0800
[fix](pipeline) Prevent concurrent accessing to dependencies (#35560)
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
If a pipeline task is cancelled by another thread during executing
`extract_dependencies`, dependencies will be accessed by different
read/write threads which will lead to serious result.
---
be/src/pipeline/pipeline_task.cpp | 31 ++++++++++++++++++++++---------
be/src/pipeline/pipeline_task.h | 4 ++--
2 files changed, 24 insertions(+), 11 deletions(-)
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 3a956a9a863..d26b0fce387 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -112,15 +112,22 @@ Status PipelineTask::prepare(const
TPipelineInstanceParams& local_params, const
_state->get_local_state(op->operator_id())->get_query_statistics_ptr());
}
{
+ std::vector<Dependency*> filter_dependencies;
const auto& deps =
_state->get_local_state(_source->operator_id())->filter_dependencies();
std::copy(deps.begin(), deps.end(),
- std::inserter(_filter_dependencies,
_filter_dependencies.end()));
+ std::inserter(filter_dependencies,
filter_dependencies.end()));
+
+ std::unique_lock<std::mutex> lc(_dependency_lock);
+ filter_dependencies.swap(_filter_dependencies);
}
return Status::OK();
}
Status PipelineTask::_extract_dependencies() {
- _read_dependencies.resize(_operators.size());
+ std::vector<std::vector<Dependency*>> read_dependencies;
+ std::vector<Dependency*> write_dependencies;
+ std::vector<Dependency*> finish_dependencies;
+ read_dependencies.resize(_operators.size());
size_t i = 0;
for (auto& op : _operators) {
auto result = _state->get_local_state_result(op->operator_id());
@@ -128,10 +135,10 @@ Status PipelineTask::_extract_dependencies() {
return result.error();
}
auto* local_state = result.value();
- _read_dependencies[i] = local_state->dependencies();
+ read_dependencies[i] = local_state->dependencies();
auto* fin_dep = local_state->finishdependency();
if (fin_dep) {
- _finish_dependencies.push_back(fin_dep);
+ finish_dependencies.push_back(fin_dep);
}
i++;
}
@@ -142,14 +149,20 @@ Status PipelineTask::_extract_dependencies() {
});
{
auto* local_state = _state->get_sink_local_state();
- _write_dependencies = local_state->dependencies();
- DCHECK(std::all_of(_write_dependencies.begin(),
_write_dependencies.end(),
+ write_dependencies = local_state->dependencies();
+ DCHECK(std::all_of(write_dependencies.begin(),
write_dependencies.end(),
[](auto* dep) { return dep->is_write_dependency();
}));
auto* fin_dep = local_state->finishdependency();
if (fin_dep) {
- _finish_dependencies.push_back(fin_dep);
+ finish_dependencies.push_back(fin_dep);
}
}
+ {
+ std::unique_lock<std::mutex> lc(_dependency_lock);
+ read_dependencies.swap(_read_dependencies);
+ write_dependencies.swap(_write_dependencies);
+ finish_dependencies.swap(_finish_dependencies);
+ }
return Status::OK();
}
@@ -413,7 +426,7 @@ bool PipelineTask::should_revoke_memory(RuntimeState*
state, int64_t revocable_m
}
void PipelineTask::finalize() {
- std::unique_lock<std::mutex> lc(_release_lock);
+ std::unique_lock<std::mutex> lc(_dependency_lock);
_finished = true;
_sink_shared_state.reset();
_op_shared_states.clear();
@@ -447,7 +460,7 @@ Status PipelineTask::close(Status exec_status) {
}
std::string PipelineTask::debug_string() {
- std::unique_lock<std::mutex> lc(_release_lock);
+ std::unique_lock<std::mutex> lc(_dependency_lock);
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "QueryId: {}\n",
print_id(query_context()->query_id()));
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 83ad8bec258..bb6587eec28 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -141,7 +141,7 @@ public:
void clear_blocking_state() {
// We use a lock to assure all dependencies are not deconstructed here.
- std::unique_lock<std::mutex> lc(_release_lock);
+ std::unique_lock<std::mutex> lc(_dependency_lock);
if (!_finished) {
_execution_dep->set_always_ready();
for (auto* dep : _filter_dependencies) {
@@ -298,7 +298,7 @@ private:
Dependency* _execution_dep = nullptr;
std::atomic<bool> _finished {false};
- std::mutex _release_lock;
+ std::mutex _dependency_lock;
std::atomic<bool> _running {false};
std::atomic<bool> _eos {false};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]