This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 0e5d56fc2ebbd505e15e54d027c1526da1aebd21
Author: Gabriel <[email protected]>
AuthorDate: Tue Jan 23 10:32:06 2024 +0800

    [pipelineX](fix) Fix use-after-free MultiCastSourceDependency (#30199)
---
 be/src/pipeline/exec/analytic_sink_operator.h      |  8 +-
 be/src/pipeline/exec/analytic_source_operator.h    |  7 +-
 .../exec/multi_cast_data_stream_source.cpp         |  7 ++
 .../pipeline/exec/multi_cast_data_stream_source.h  |  2 +-
 be/src/pipeline/exec/multi_cast_data_streamer.cpp  | 32 ++++++--
 be/src/pipeline/exec/multi_cast_data_streamer.h    | 13 +++-
 be/src/pipeline/exec/set_probe_sink_operator.cpp   |  2 +-
 be/src/pipeline/exec/set_sink_operator.cpp         |  2 +-
 be/src/pipeline/pipeline_x/dependency.cpp          |  9 +++
 be/src/pipeline/pipeline_x/dependency.h            | 88 ++++++++++++++++++++--
 .../local_exchange_source_operator.cpp             |  8 +-
 .../local_exchange_source_operator.h               |  1 +
 be/src/pipeline/pipeline_x/operator.cpp            | 11 ++-
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  4 +-
 .../main/java/org/apache/doris/qe/Coordinator.java |  4 +-
 15 files changed, 170 insertions(+), 28 deletions(-)

diff --git a/be/src/pipeline/exec/analytic_sink_operator.h 
b/be/src/pipeline/exec/analytic_sink_operator.h
index e99a0731b34..0214b22c006 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -63,6 +63,10 @@ public:
             : PipelineXSinkLocalState<AnalyticSinkDependency>(parent, state) {}
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+    Status close(RuntimeState* state, Status exec_status) override {
+        _shared_state->release_sink_dep();
+        return PipelineXSinkLocalState<AnalyticSinkDependency>::close(state, 
exec_status);
+    }
 
 private:
     friend class AnalyticSinkOperatorX;
@@ -70,11 +74,11 @@ private:
     bool _refresh_need_more_input() {
         auto need_more_input = 
_whether_need_next_partition(_shared_state->found_partition_end);
         if (need_more_input) {
-            _shared_state->source_dep->block();
+            _dependency->set_block_to_read();
             _dependency->set_ready();
         } else {
             _dependency->block();
-            _shared_state->source_dep->set_ready();
+            _dependency->set_ready_to_read();
         }
         return need_more_input;
     }
diff --git a/be/src/pipeline/exec/analytic_source_operator.h 
b/be/src/pipeline/exec/analytic_source_operator.h
index ba7b00be906..dc86bc95062 100644
--- a/be/src/pipeline/exec/analytic_source_operator.h
+++ b/be/src/pipeline/exec/analytic_source_operator.h
@@ -83,9 +83,12 @@ private:
         auto need_more_input = 
_whether_need_next_partition(_shared_state->found_partition_end);
         if (need_more_input) {
             _dependency->block();
-            _shared_state->sink_dep->set_ready();
+            _dependency->set_ready_to_write();
+            if (!_shared_state->sink_released_flag) {
+                _shared_state->sink_dep->set_ready();
+            }
         } else {
-            _shared_state->sink_dep->block();
+            _dependency->set_block_to_write();
             _dependency->set_ready();
         }
         return need_more_input;
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp 
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index 6ac06ee5f10..d360e2eb5dd 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -152,6 +152,13 @@ Status 
MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
     return Status::OK();
 }
 
+Status MultiCastDataStreamSourceLocalState::close(RuntimeState* state) {
+    _shared_state->multi_cast_data_streamer.released_dependency(
+            _parent->cast<Parent>()._consumer_id);
+    RETURN_IF_ERROR(Base::close(state));
+    return Status::OK();
+}
+
 Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state,
                                                        vectorized::Block* 
block,
                                                        SourceState& 
source_state) {
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h 
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index baeca2ca7b1..73c506f9cb8 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -117,7 +117,7 @@ public:
         RETURN_IF_ERROR(_acquire_runtime_filter());
         return Status::OK();
     }
-
+    Status close(RuntimeState* state) override;
     friend class MultiCastDataStreamerSourceOperatorX;
 
     RuntimeFilterDependency* filterdependency() override { return 
_filter_dependency.get(); }
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp 
b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
index 175a21469b8..f3e44731aef 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
@@ -104,15 +104,37 @@ void MultiCastDataStreamer::_set_ready_for_read(int 
sender_idx) {
     if (_dependencies.empty()) {
         return;
     }
-    auto* dep = _dependencies[sender_idx];
-    DCHECK(dep);
-    dep->set_ready();
+    if (_dependencies_release_flag[sender_idx]) {
+        return;
+    }
+    {
+        std::unique_lock<std::mutex> lc(_release_lock);
+        if (_dependencies_release_flag[sender_idx]) {
+            return;
+        }
+        auto* dep = _dependencies[sender_idx];
+        DCHECK(dep);
+        dep->set_ready();
+    }
 }
 
 void MultiCastDataStreamer::_set_ready_for_read() {
+    size_t i = 0;
     for (auto* dep : _dependencies) {
-        DCHECK(dep);
-        dep->set_ready();
+        if (_dependencies_release_flag[i]) {
+            i++;
+            continue;
+        }
+        {
+            std::unique_lock<std::mutex> lc(_release_lock);
+            if (_dependencies_release_flag[i]) {
+                i++;
+                continue;
+            }
+            DCHECK(dep);
+            dep->set_ready();
+            i++;
+        }
     }
 }
 
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h 
b/be/src/pipeline/exec/multi_cast_data_streamer.h
index 5e4179e0cad..7f221d622c0 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.h
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.h
@@ -38,10 +38,14 @@ public:
                           bool with_dependencies = false)
             : _row_desc(row_desc),
               _profile(pool->add(new 
RuntimeProfile("MultiCastDataStreamSink"))),
-              _cast_sender_count(cast_sender_count) {
+              _cast_sender_count(cast_sender_count),
+              _dependencies_release_flag(cast_sender_count) {
         _sender_pos_to_read.resize(cast_sender_count, 
_multi_cast_blocks.end());
         if (with_dependencies) {
             _dependencies.resize(cast_sender_count, nullptr);
+            for (size_t i = 0; i < cast_sender_count; i++) {
+                _dependencies_release_flag[i] = false;
+            }
         }
 
         _peak_mem_usage = ADD_COUNTER(profile(), "PeakMemUsage", TUnit::BYTES);
@@ -79,6 +83,11 @@ public:
         _block_reading(sender_idx);
     }
 
+    void released_dependency(int sender_idx) {
+        std::unique_lock<std::mutex> lc(_release_lock);
+        _dependencies_release_flag[sender_idx] = true;
+    }
+
 private:
     void _set_ready_for_read(int sender_idx);
     void _set_ready_for_read();
@@ -97,6 +106,8 @@ private:
     RuntimeProfile::Counter* _process_rows = nullptr;
     RuntimeProfile::Counter* _peak_mem_usage = nullptr;
 
+    std::mutex _release_lock;
+    std::vector<std::atomic<bool>> _dependencies_release_flag;
     std::vector<MultiCastSourceDependency*> _dependencies;
 };
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp 
b/be/src/pipeline/exec/set_probe_sink_operator.cpp
index 5ff2c3df2d2..09e5b4b0045 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp
@@ -219,7 +219,7 @@ void SetProbeSinkOperatorX<is_intersect>::_finalize_probe(
         
local_state._shared_state->probe_finished_children_dependency[_cur_child_id + 1]
                 ->set_ready();
     } else {
-        local_state._shared_state->source_dep->set_ready();
+        local_state._dependency->set_ready_to_read();
     }
 }
 
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp 
b/be/src/pipeline/exec/set_sink_operator.cpp
index cb106d76edb..862ad411f5c 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -94,7 +94,7 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* 
state, vectorized::Blo
             
local_state._shared_state->probe_finished_children_dependency[_cur_child_id + 1]
                     ->set_ready();
             if (_child_quantity == 1) {
-                local_state._shared_state->source_dep->set_ready();
+                local_state._dependency->set_ready_to_read();
             }
         }
     }
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp 
b/be/src/pipeline/pipeline_x/dependency.cpp
index 86fe982343a..bb27a688820 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -189,4 +189,13 @@ void 
LocalExchangeSharedState::sub_running_sink_operators() {
     }
 }
 
+LocalExchangeSharedState::LocalExchangeSharedState(int num_instances)
+        : dependencies_release_flag(num_instances) {
+    source_dependencies.resize(num_instances, nullptr);
+    mem_trackers.resize(num_instances, nullptr);
+    for (size_t i = 0; i < num_instances; i++) {
+        dependencies_release_flag[i] = false;
+    }
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 61c251e00b2..172f7383f3e 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -57,9 +57,22 @@ static constexpr auto TIME_UNIT_DEPENDENCY_LOG = 30 * 1000L 
* 1000L * 1000L;
 static_assert(TIME_UNIT_DEPENDENCY_LOG < SLOW_DEPENDENCY_THRESHOLD);
 
 struct BasicSharedState {
-    DependencySPtr source_dep = nullptr;
-    DependencySPtr sink_dep = nullptr;
+    Dependency* source_dep = nullptr;
+    Dependency* sink_dep = nullptr;
 
+    std::atomic_bool source_released_flag {false};
+    std::atomic_bool sink_released_flag {false};
+    std::mutex source_release_lock;
+    std::mutex sink_release_lock;
+
+    void release_source_dep() {
+        std::unique_lock<std::mutex> lc(source_release_lock);
+        source_released_flag = true;
+    }
+    void release_sink_dep() {
+        std::unique_lock<std::mutex> lc(sink_release_lock);
+        sink_released_flag = true;
+    }
     virtual ~BasicSharedState() = default;
 };
 
@@ -108,9 +121,50 @@ public:
     virtual void set_ready();
     void set_ready_to_read() {
         DCHECK(_is_write_dependency) << debug_string();
+        if (_shared_state->source_released_flag) {
+            return;
+        }
+        std::unique_lock<std::mutex> lc(_shared_state->source_release_lock);
+        if (_shared_state->source_released_flag) {
+            return;
+        }
         DCHECK(_shared_state->source_dep != nullptr) << debug_string();
         _shared_state->source_dep->set_ready();
     }
+    void set_block_to_read() {
+        DCHECK(_is_write_dependency) << debug_string();
+        if (_shared_state->source_released_flag) {
+            return;
+        }
+        std::unique_lock<std::mutex> lc(_shared_state->source_release_lock);
+        if (_shared_state->source_released_flag) {
+            return;
+        }
+        DCHECK(_shared_state->source_dep != nullptr) << debug_string();
+        _shared_state->source_dep->block();
+    }
+    void set_ready_to_write() {
+        if (_shared_state->sink_released_flag) {
+            return;
+        }
+        std::unique_lock<std::mutex> lc(_shared_state->sink_release_lock);
+        if (_shared_state->sink_released_flag) {
+            return;
+        }
+        DCHECK(_shared_state->sink_dep != nullptr) << debug_string();
+        _shared_state->sink_dep->set_ready();
+    }
+    void set_block_to_write() {
+        if (_shared_state->sink_released_flag) {
+            return;
+        }
+        std::unique_lock<std::mutex> lc(_shared_state->sink_release_lock);
+        if (_shared_state->sink_released_flag) {
+            return;
+        }
+        DCHECK(_shared_state->sink_dep != nullptr) << debug_string();
+        _shared_state->sink_dep->block();
+    }
 
     // Notify downstream pipeline tasks this dependency is blocked.
     virtual void block() { _ready = false; }
@@ -610,25 +664,47 @@ class Exchanger;
 struct LocalExchangeSharedState : public BasicSharedState {
 public:
     ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
+    LocalExchangeSharedState(int num_instances);
     std::unique_ptr<Exchanger> exchanger {};
-    std::vector<DependencySPtr> source_dependencies;
+    std::vector<Dependency*> source_dependencies;
+    std::vector<std::atomic_bool> dependencies_release_flag;
     Dependency* sink_dependency;
     std::vector<MemTracker*> mem_trackers;
     std::atomic<size_t> mem_usage = 0;
     std::mutex le_lock;
     void sub_running_sink_operators();
     void _set_ready_for_read() {
+        size_t i = 0;
         for (auto& dep : source_dependencies) {
-            DCHECK(dep);
-            dep->set_ready();
+            if (dependencies_release_flag[i]) {
+                i++;
+                continue;
+            }
+            {
+                std::unique_lock<std::mutex> lc(source_release_lock);
+                if (dependencies_release_flag[i]) {
+                    i++;
+                    continue;
+                }
+                DCHECK(dep);
+                dep->set_ready();
+                i++;
+            }
         }
     }
 
-    void set_dep_by_channel_id(DependencySPtr dep, int channel_id) {
+    void set_dep_by_channel_id(Dependency* dep, int channel_id) {
         source_dependencies[channel_id] = dep;
     }
 
     void set_ready_to_read(int channel_id) {
+        if (dependencies_release_flag[channel_id]) {
+            return;
+        }
+        std::unique_lock<std::mutex> lc(source_release_lock);
+        if (dependencies_release_flag[channel_id]) {
+            return;
+        }
         auto& dep = source_dependencies[channel_id];
         DCHECK(dep) << channel_id;
         dep->set_ready();
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
index 029dcb15a48..9e98e3b6e8f 100644
--- 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
+++ 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
@@ -37,7 +37,7 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* 
state, LocalStateInfo&
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     _channel_id = info.task_idx;
-    _shared_state->set_dep_by_channel_id(info.dependency, _channel_id);
+    _shared_state->set_dep_by_channel_id(_dependency, _channel_id);
     _shared_state->mem_trackers[_channel_id] = _mem_tracker.get();
     _exchanger = _shared_state->exchanger.get();
     DCHECK(_exchanger != nullptr);
@@ -61,6 +61,12 @@ std::string LocalExchangeSourceLocalState::debug_string(int 
indentation_level) c
     return fmt::to_string(debug_string_buffer);
 }
 
+Status LocalExchangeSourceLocalState::close(RuntimeState* state) {
+    _shared_state->dependencies_release_flag[_channel_id] = true;
+    RETURN_IF_ERROR(Base::close(state));
+    return Status::OK();
+}
+
 Status LocalExchangeSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* block,
                                                SourceState& source_state) {
     auto& local_state = get_local_state(state);
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
index 63d71bbe08b..4c95a84b533 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
@@ -48,6 +48,7 @@ public:
 
     Status init(RuntimeState* state, LocalStateInfo& info) override;
     std::string debug_string(int indentation_level) const override;
+    Status close(RuntimeState* state) override;
 
 private:
     friend class LocalExchangeSourceOperatorX;
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index bbb7473f868..e00b1632eb4 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -338,15 +338,15 @@ Status 
PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalState
             _shared_state =
                     (typename 
DependencyType::SharedState*)_dependency->shared_state().get();
 
-            _shared_state->source_dep = info.dependency;
-            _shared_state->sink_dep = deps.front();
+            _shared_state->source_dep = info.dependency.get();
+            _shared_state->sink_dep = deps.front().get();
         } else if constexpr (!is_fake_shared) {
             _dependency->set_shared_state(deps.front()->shared_state());
             _shared_state =
                     (typename 
DependencyType::SharedState*)_dependency->shared_state().get();
 
-            _shared_state->source_dep = info.dependency;
-            _shared_state->sink_dep = deps.front();
+            _shared_state->source_dep = info.dependency.get();
+            _shared_state->sink_dep = deps.front().get();
         }
     }
 
@@ -378,6 +378,9 @@ Status 
PipelineXLocalState<DependencyType>::close(RuntimeState* state) {
     if (_closed) {
         return Status::OK();
     }
+    if (_shared_state) {
+        _shared_state->release_source_dep();
+    }
     if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
         COUNTER_SET(_wait_for_dependency_timer, 
_dependency->watcher_elapse_time());
         _dependency->clear_shared_state();
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 4a16b97b2f3..cf4c312926e 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -739,7 +739,7 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
                                             is_shuffled_hash_join, 
shuffle_idx_to_instance_idx));
 
     // 2. Create and initialize LocalExchangeSharedState.
-    auto shared_state = LocalExchangeSharedState::create_shared();
+    auto shared_state = 
LocalExchangeSharedState::create_shared(_num_instances);
     switch (data_distribution.distribution_type) {
     case ExchangeType::HASH_SHUFFLE:
         shared_state->exchanger = ShuffleExchanger::create_unique(
@@ -771,8 +771,6 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
         return Status::InternalError("Unsupported local exchange type : " +
                                      
std::to_string((int)data_distribution.distribution_type));
     }
-    shared_state->source_dependencies.resize(_num_instances, nullptr);
-    shared_state->mem_trackers.resize(_num_instances, nullptr);
     auto sink_dep = std::make_shared<LocalExchangeSinkDependency>(sink_id, 
local_exchange_id,
                                                                   
_runtime_state->get_query_ctx());
     sink_dep->set_shared_state(shared_state);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 8581980da2f..222fc37a429 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -3985,7 +3985,9 @@ public class Coordinator implements CoordInterface {
         if (enablePipelineEngine) {
             for (PipelineExecContext ctx : pipelineExecContexts.values()) {
                 if (enablePipelineXEngine) {
-                    ctx.attachPipelineProfileToFragmentProfile();
+                    synchronized (this) {
+                        ctx.attachPipelineProfileToFragmentProfile();
+                    }
                 } else {
                     ctx.profileStream()
                             .forEach(p -> 
executionProfile.addInstanceProfile(ctx.profileFragmentId, p));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to