This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new da71c0cb940 [fix](pipeline) Prevent concurrent accessing to 
dependencies (#35560)
da71c0cb940 is described below

commit da71c0cb940ececebb7129ab9f56b3dfae6a6b9a
Author: Gabriel <[email protected]>
AuthorDate: Wed May 29 12:39:03 2024 +0800

    [fix](pipeline) Prevent concurrent accessing to dependencies (#35560)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
    
    If a pipeline task is cancelled by another thread during executing
    `extract_dependencies`, dependencies will be accessed by different
    read/write threads which will lead to serious result.
---
 be/src/pipeline/pipeline_task.cpp | 31 ++++++++++++++++++++++---------
 be/src/pipeline/pipeline_task.h   |  4 ++--
 2 files changed, 24 insertions(+), 11 deletions(-)

diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 3a956a9a863..d26b0fce387 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -112,15 +112,22 @@ Status PipelineTask::prepare(const 
TPipelineInstanceParams& local_params, const
                 
_state->get_local_state(op->operator_id())->get_query_statistics_ptr());
     }
     {
+        std::vector<Dependency*> filter_dependencies;
         const auto& deps = 
_state->get_local_state(_source->operator_id())->filter_dependencies();
         std::copy(deps.begin(), deps.end(),
-                  std::inserter(_filter_dependencies, 
_filter_dependencies.end()));
+                  std::inserter(filter_dependencies, 
filter_dependencies.end()));
+
+        std::unique_lock<std::mutex> lc(_dependency_lock);
+        filter_dependencies.swap(_filter_dependencies);
     }
     return Status::OK();
 }
 
 Status PipelineTask::_extract_dependencies() {
-    _read_dependencies.resize(_operators.size());
+    std::vector<std::vector<Dependency*>> read_dependencies;
+    std::vector<Dependency*> write_dependencies;
+    std::vector<Dependency*> finish_dependencies;
+    read_dependencies.resize(_operators.size());
     size_t i = 0;
     for (auto& op : _operators) {
         auto result = _state->get_local_state_result(op->operator_id());
@@ -128,10 +135,10 @@ Status PipelineTask::_extract_dependencies() {
             return result.error();
         }
         auto* local_state = result.value();
-        _read_dependencies[i] = local_state->dependencies();
+        read_dependencies[i] = local_state->dependencies();
         auto* fin_dep = local_state->finishdependency();
         if (fin_dep) {
-            _finish_dependencies.push_back(fin_dep);
+            finish_dependencies.push_back(fin_dep);
         }
         i++;
     }
@@ -142,14 +149,20 @@ Status PipelineTask::_extract_dependencies() {
     });
     {
         auto* local_state = _state->get_sink_local_state();
-        _write_dependencies = local_state->dependencies();
-        DCHECK(std::all_of(_write_dependencies.begin(), 
_write_dependencies.end(),
+        write_dependencies = local_state->dependencies();
+        DCHECK(std::all_of(write_dependencies.begin(), 
write_dependencies.end(),
                            [](auto* dep) { return dep->is_write_dependency(); 
}));
         auto* fin_dep = local_state->finishdependency();
         if (fin_dep) {
-            _finish_dependencies.push_back(fin_dep);
+            finish_dependencies.push_back(fin_dep);
         }
     }
+    {
+        std::unique_lock<std::mutex> lc(_dependency_lock);
+        read_dependencies.swap(_read_dependencies);
+        write_dependencies.swap(_write_dependencies);
+        finish_dependencies.swap(_finish_dependencies);
+    }
     return Status::OK();
 }
 
@@ -413,7 +426,7 @@ bool PipelineTask::should_revoke_memory(RuntimeState* 
state, int64_t revocable_m
 }
 
 void PipelineTask::finalize() {
-    std::unique_lock<std::mutex> lc(_release_lock);
+    std::unique_lock<std::mutex> lc(_dependency_lock);
     _finished = true;
     _sink_shared_state.reset();
     _op_shared_states.clear();
@@ -447,7 +460,7 @@ Status PipelineTask::close(Status exec_status) {
 }
 
 std::string PipelineTask::debug_string() {
-    std::unique_lock<std::mutex> lc(_release_lock);
+    std::unique_lock<std::mutex> lc(_dependency_lock);
     fmt::memory_buffer debug_string_buffer;
 
     fmt::format_to(debug_string_buffer, "QueryId: {}\n", 
print_id(query_context()->query_id()));
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 83ad8bec258..bb6587eec28 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -141,7 +141,7 @@ public:
 
     void clear_blocking_state() {
         // We use a lock to assure all dependencies are not deconstructed here.
-        std::unique_lock<std::mutex> lc(_release_lock);
+        std::unique_lock<std::mutex> lc(_dependency_lock);
         if (!_finished) {
             _execution_dep->set_always_ready();
             for (auto* dep : _filter_dependencies) {
@@ -298,7 +298,7 @@ private:
     Dependency* _execution_dep = nullptr;
 
     std::atomic<bool> _finished {false};
-    std::mutex _release_lock;
+    std::mutex _dependency_lock;
 
     std::atomic<bool> _running {false};
     std::atomic<bool> _eos {false};


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to