This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 0e5d56fc2ebbd505e15e54d027c1526da1aebd21 Author: Gabriel <[email protected]> AuthorDate: Tue Jan 23 10:32:06 2024 +0800 [pipelineX](fix) Fix use-after-free MultiCastSourceDependency (#30199) --- be/src/pipeline/exec/analytic_sink_operator.h | 8 +- be/src/pipeline/exec/analytic_source_operator.h | 7 +- .../exec/multi_cast_data_stream_source.cpp | 7 ++ .../pipeline/exec/multi_cast_data_stream_source.h | 2 +- be/src/pipeline/exec/multi_cast_data_streamer.cpp | 32 ++++++-- be/src/pipeline/exec/multi_cast_data_streamer.h | 13 +++- be/src/pipeline/exec/set_probe_sink_operator.cpp | 2 +- be/src/pipeline/exec/set_sink_operator.cpp | 2 +- be/src/pipeline/pipeline_x/dependency.cpp | 9 +++ be/src/pipeline/pipeline_x/dependency.h | 88 ++++++++++++++++++++-- .../local_exchange_source_operator.cpp | 8 +- .../local_exchange_source_operator.h | 1 + be/src/pipeline/pipeline_x/operator.cpp | 11 ++- .../pipeline_x/pipeline_x_fragment_context.cpp | 4 +- .../main/java/org/apache/doris/qe/Coordinator.java | 4 +- 15 files changed, 170 insertions(+), 28 deletions(-) diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index e99a0731b34..0214b22c006 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -63,6 +63,10 @@ public: : PipelineXSinkLocalState<AnalyticSinkDependency>(parent, state) {} Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + Status close(RuntimeState* state, Status exec_status) override { + _shared_state->release_sink_dep(); + return PipelineXSinkLocalState<AnalyticSinkDependency>::close(state, exec_status); + } private: friend class AnalyticSinkOperatorX; @@ -70,11 +74,11 @@ private: bool _refresh_need_more_input() { auto need_more_input = _whether_need_next_partition(_shared_state->found_partition_end); if (need_more_input) { - _shared_state->source_dep->block(); + _dependency->set_block_to_read(); _dependency->set_ready(); } else { _dependency->block(); - _shared_state->source_dep->set_ready(); + _dependency->set_ready_to_read(); } return need_more_input; } diff --git a/be/src/pipeline/exec/analytic_source_operator.h b/be/src/pipeline/exec/analytic_source_operator.h index ba7b00be906..dc86bc95062 100644 --- a/be/src/pipeline/exec/analytic_source_operator.h +++ b/be/src/pipeline/exec/analytic_source_operator.h @@ -83,9 +83,12 @@ private: auto need_more_input = _whether_need_next_partition(_shared_state->found_partition_end); if (need_more_input) { _dependency->block(); - _shared_state->sink_dep->set_ready(); + _dependency->set_ready_to_write(); + if (!_shared_state->sink_released_flag) { + _shared_state->sink_dep->set_ready(); + } } else { - _shared_state->sink_dep->block(); + _dependency->set_block_to_write(); _dependency->set_ready(); } return need_more_input; diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index 6ac06ee5f10..d360e2eb5dd 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -152,6 +152,13 @@ Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState return Status::OK(); } +Status MultiCastDataStreamSourceLocalState::close(RuntimeState* state) { + _shared_state->multi_cast_data_streamer.released_dependency( + _parent->cast<Parent>()._consumer_id); + RETURN_IF_ERROR(Base::close(state)); + return Status::OK(); +} + Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) { diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index baeca2ca7b1..73c506f9cb8 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -117,7 +117,7 @@ public: RETURN_IF_ERROR(_acquire_runtime_filter()); return Status::OK(); } - + Status close(RuntimeState* state) override; friend class MultiCastDataStreamerSourceOperatorX; RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); } diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp b/be/src/pipeline/exec/multi_cast_data_streamer.cpp index 175a21469b8..f3e44731aef 100644 --- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp +++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp @@ -104,15 +104,37 @@ void MultiCastDataStreamer::_set_ready_for_read(int sender_idx) { if (_dependencies.empty()) { return; } - auto* dep = _dependencies[sender_idx]; - DCHECK(dep); - dep->set_ready(); + if (_dependencies_release_flag[sender_idx]) { + return; + } + { + std::unique_lock<std::mutex> lc(_release_lock); + if (_dependencies_release_flag[sender_idx]) { + return; + } + auto* dep = _dependencies[sender_idx]; + DCHECK(dep); + dep->set_ready(); + } } void MultiCastDataStreamer::_set_ready_for_read() { + size_t i = 0; for (auto* dep : _dependencies) { - DCHECK(dep); - dep->set_ready(); + if (_dependencies_release_flag[i]) { + i++; + continue; + } + { + std::unique_lock<std::mutex> lc(_release_lock); + if (_dependencies_release_flag[i]) { + i++; + continue; + } + DCHECK(dep); + dep->set_ready(); + i++; + } } } diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h b/be/src/pipeline/exec/multi_cast_data_streamer.h index 5e4179e0cad..7f221d622c0 100644 --- a/be/src/pipeline/exec/multi_cast_data_streamer.h +++ b/be/src/pipeline/exec/multi_cast_data_streamer.h @@ -38,10 +38,14 @@ public: bool with_dependencies = false) : _row_desc(row_desc), _profile(pool->add(new RuntimeProfile("MultiCastDataStreamSink"))), - _cast_sender_count(cast_sender_count) { + _cast_sender_count(cast_sender_count), + _dependencies_release_flag(cast_sender_count) { _sender_pos_to_read.resize(cast_sender_count, _multi_cast_blocks.end()); if (with_dependencies) { _dependencies.resize(cast_sender_count, nullptr); + for (size_t i = 0; i < cast_sender_count; i++) { + _dependencies_release_flag[i] = false; + } } _peak_mem_usage = ADD_COUNTER(profile(), "PeakMemUsage", TUnit::BYTES); @@ -79,6 +83,11 @@ public: _block_reading(sender_idx); } + void released_dependency(int sender_idx) { + std::unique_lock<std::mutex> lc(_release_lock); + _dependencies_release_flag[sender_idx] = true; + } + private: void _set_ready_for_read(int sender_idx); void _set_ready_for_read(); @@ -97,6 +106,8 @@ private: RuntimeProfile::Counter* _process_rows = nullptr; RuntimeProfile::Counter* _peak_mem_usage = nullptr; + std::mutex _release_lock; + std::vector<std::atomic<bool>> _dependencies_release_flag; std::vector<MultiCastSourceDependency*> _dependencies; }; } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp b/be/src/pipeline/exec/set_probe_sink_operator.cpp index 5ff2c3df2d2..09e5b4b0045 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.cpp +++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp @@ -219,7 +219,7 @@ void SetProbeSinkOperatorX<is_intersect>::_finalize_probe( local_state._shared_state->probe_finished_children_dependency[_cur_child_id + 1] ->set_ready(); } else { - local_state._shared_state->source_dep->set_ready(); + local_state._dependency->set_ready_to_read(); } } diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp index cb106d76edb..862ad411f5c 100644 --- a/be/src/pipeline/exec/set_sink_operator.cpp +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -94,7 +94,7 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized::Blo local_state._shared_state->probe_finished_children_dependency[_cur_child_id + 1] ->set_ready(); if (_child_quantity == 1) { - local_state._shared_state->source_dep->set_ready(); + local_state._dependency->set_ready_to_read(); } } } diff --git a/be/src/pipeline/pipeline_x/dependency.cpp b/be/src/pipeline/pipeline_x/dependency.cpp index 86fe982343a..bb27a688820 100644 --- a/be/src/pipeline/pipeline_x/dependency.cpp +++ b/be/src/pipeline/pipeline_x/dependency.cpp @@ -189,4 +189,13 @@ void LocalExchangeSharedState::sub_running_sink_operators() { } } +LocalExchangeSharedState::LocalExchangeSharedState(int num_instances) + : dependencies_release_flag(num_instances) { + source_dependencies.resize(num_instances, nullptr); + mem_trackers.resize(num_instances, nullptr); + for (size_t i = 0; i < num_instances; i++) { + dependencies_release_flag[i] = false; + } +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 61c251e00b2..172f7383f3e 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -57,9 +57,22 @@ static constexpr auto TIME_UNIT_DEPENDENCY_LOG = 30 * 1000L * 1000L * 1000L; static_assert(TIME_UNIT_DEPENDENCY_LOG < SLOW_DEPENDENCY_THRESHOLD); struct BasicSharedState { - DependencySPtr source_dep = nullptr; - DependencySPtr sink_dep = nullptr; + Dependency* source_dep = nullptr; + Dependency* sink_dep = nullptr; + std::atomic_bool source_released_flag {false}; + std::atomic_bool sink_released_flag {false}; + std::mutex source_release_lock; + std::mutex sink_release_lock; + + void release_source_dep() { + std::unique_lock<std::mutex> lc(source_release_lock); + source_released_flag = true; + } + void release_sink_dep() { + std::unique_lock<std::mutex> lc(sink_release_lock); + sink_released_flag = true; + } virtual ~BasicSharedState() = default; }; @@ -108,9 +121,50 @@ public: virtual void set_ready(); void set_ready_to_read() { DCHECK(_is_write_dependency) << debug_string(); + if (_shared_state->source_released_flag) { + return; + } + std::unique_lock<std::mutex> lc(_shared_state->source_release_lock); + if (_shared_state->source_released_flag) { + return; + } DCHECK(_shared_state->source_dep != nullptr) << debug_string(); _shared_state->source_dep->set_ready(); } + void set_block_to_read() { + DCHECK(_is_write_dependency) << debug_string(); + if (_shared_state->source_released_flag) { + return; + } + std::unique_lock<std::mutex> lc(_shared_state->source_release_lock); + if (_shared_state->source_released_flag) { + return; + } + DCHECK(_shared_state->source_dep != nullptr) << debug_string(); + _shared_state->source_dep->block(); + } + void set_ready_to_write() { + if (_shared_state->sink_released_flag) { + return; + } + std::unique_lock<std::mutex> lc(_shared_state->sink_release_lock); + if (_shared_state->sink_released_flag) { + return; + } + DCHECK(_shared_state->sink_dep != nullptr) << debug_string(); + _shared_state->sink_dep->set_ready(); + } + void set_block_to_write() { + if (_shared_state->sink_released_flag) { + return; + } + std::unique_lock<std::mutex> lc(_shared_state->sink_release_lock); + if (_shared_state->sink_released_flag) { + return; + } + DCHECK(_shared_state->sink_dep != nullptr) << debug_string(); + _shared_state->sink_dep->block(); + } // Notify downstream pipeline tasks this dependency is blocked. virtual void block() { _ready = false; } @@ -610,25 +664,47 @@ class Exchanger; struct LocalExchangeSharedState : public BasicSharedState { public: ENABLE_FACTORY_CREATOR(LocalExchangeSharedState); + LocalExchangeSharedState(int num_instances); std::unique_ptr<Exchanger> exchanger {}; - std::vector<DependencySPtr> source_dependencies; + std::vector<Dependency*> source_dependencies; + std::vector<std::atomic_bool> dependencies_release_flag; Dependency* sink_dependency; std::vector<MemTracker*> mem_trackers; std::atomic<size_t> mem_usage = 0; std::mutex le_lock; void sub_running_sink_operators(); void _set_ready_for_read() { + size_t i = 0; for (auto& dep : source_dependencies) { - DCHECK(dep); - dep->set_ready(); + if (dependencies_release_flag[i]) { + i++; + continue; + } + { + std::unique_lock<std::mutex> lc(source_release_lock); + if (dependencies_release_flag[i]) { + i++; + continue; + } + DCHECK(dep); + dep->set_ready(); + i++; + } } } - void set_dep_by_channel_id(DependencySPtr dep, int channel_id) { + void set_dep_by_channel_id(Dependency* dep, int channel_id) { source_dependencies[channel_id] = dep; } void set_ready_to_read(int channel_id) { + if (dependencies_release_flag[channel_id]) { + return; + } + std::unique_lock<std::mutex> lc(source_release_lock); + if (dependencies_release_flag[channel_id]) { + return; + } auto& dep = source_dependencies[channel_id]; DCHECK(dep) << channel_id; dep->set_ready(); diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp index 029dcb15a48..9e98e3b6e8f 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp @@ -37,7 +37,7 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo& SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); _channel_id = info.task_idx; - _shared_state->set_dep_by_channel_id(info.dependency, _channel_id); + _shared_state->set_dep_by_channel_id(_dependency, _channel_id); _shared_state->mem_trackers[_channel_id] = _mem_tracker.get(); _exchanger = _shared_state->exchanger.get(); DCHECK(_exchanger != nullptr); @@ -61,6 +61,12 @@ std::string LocalExchangeSourceLocalState::debug_string(int indentation_level) c return fmt::to_string(debug_string_buffer); } +Status LocalExchangeSourceLocalState::close(RuntimeState* state) { + _shared_state->dependencies_release_flag[_channel_id] = true; + RETURN_IF_ERROR(Base::close(state)); + return Status::OK(); +} + Status LocalExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) { auto& local_state = get_local_state(state); diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h index 63d71bbe08b..4c95a84b533 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h @@ -48,6 +48,7 @@ public: Status init(RuntimeState* state, LocalStateInfo& info) override; std::string debug_string(int indentation_level) const override; + Status close(RuntimeState* state) override; private: friend class LocalExchangeSourceOperatorX; diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index bbb7473f868..e00b1632eb4 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -338,15 +338,15 @@ Status PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalState _shared_state = (typename DependencyType::SharedState*)_dependency->shared_state().get(); - _shared_state->source_dep = info.dependency; - _shared_state->sink_dep = deps.front(); + _shared_state->source_dep = info.dependency.get(); + _shared_state->sink_dep = deps.front().get(); } else if constexpr (!is_fake_shared) { _dependency->set_shared_state(deps.front()->shared_state()); _shared_state = (typename DependencyType::SharedState*)_dependency->shared_state().get(); - _shared_state->source_dep = info.dependency; - _shared_state->sink_dep = deps.front(); + _shared_state->source_dep = info.dependency.get(); + _shared_state->sink_dep = deps.front().get(); } } @@ -378,6 +378,9 @@ Status PipelineXLocalState<DependencyType>::close(RuntimeState* state) { if (_closed) { return Status::OK(); } + if (_shared_state) { + _shared_state->release_source_dep(); + } if constexpr (!std::is_same_v<DependencyType, FakeDependency>) { COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time()); _dependency->clear_shared_state(); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 4a16b97b2f3..cf4c312926e 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -739,7 +739,7 @@ Status PipelineXFragmentContext::_add_local_exchange_impl( is_shuffled_hash_join, shuffle_idx_to_instance_idx)); // 2. Create and initialize LocalExchangeSharedState. - auto shared_state = LocalExchangeSharedState::create_shared(); + auto shared_state = LocalExchangeSharedState::create_shared(_num_instances); switch (data_distribution.distribution_type) { case ExchangeType::HASH_SHUFFLE: shared_state->exchanger = ShuffleExchanger::create_unique( @@ -771,8 +771,6 @@ Status PipelineXFragmentContext::_add_local_exchange_impl( return Status::InternalError("Unsupported local exchange type : " + std::to_string((int)data_distribution.distribution_type)); } - shared_state->source_dependencies.resize(_num_instances, nullptr); - shared_state->mem_trackers.resize(_num_instances, nullptr); auto sink_dep = std::make_shared<LocalExchangeSinkDependency>(sink_id, local_exchange_id, _runtime_state->get_query_ctx()); sink_dep->set_shared_state(shared_state); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 8581980da2f..222fc37a429 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -3985,7 +3985,9 @@ public class Coordinator implements CoordInterface { if (enablePipelineEngine) { for (PipelineExecContext ctx : pipelineExecContexts.values()) { if (enablePipelineXEngine) { - ctx.attachPipelineProfileToFragmentProfile(); + synchronized (this) { + ctx.attachPipelineProfileToFragmentProfile(); + } } else { ctx.profileStream() .forEach(p -> executionProfile.addInstanceProfile(ctx.profileFragmentId, p)); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
