This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 2b7fa9d6bbd42fdb9eca2a21c8e0cc4e7dcec10a Author: Gabriel <[email protected]> AuthorDate: Thu Feb 29 09:58:27 2024 +0800 [pipelineX](refactor) Rebuild relationship between dep and operator (#31487) --- be/src/pipeline/exec/analytic_source_operator.h | 1 - be/src/pipeline/exec/es_scan_operator.cpp | 2 +- be/src/pipeline/exec/exchange_sink_operator.cpp | 7 -- be/src/pipeline/exec/exchange_sink_operator.h | 18 ++- be/src/pipeline/exec/exchange_source_operator.cpp | 5 +- be/src/pipeline/exec/exchange_source_operator.h | 11 +- be/src/pipeline/exec/file_scan_operator.cpp | 2 +- be/src/pipeline/exec/multi_cast_data_stream_sink.h | 11 +- be/src/pipeline/exec/olap_scan_operator.cpp | 4 +- be/src/pipeline/exec/scan_operator.cpp | 28 +++-- be/src/pipeline/exec/scan_operator.h | 9 +- be/src/pipeline/exec/set_probe_sink_operator.h | 2 + be/src/pipeline/exec/set_sink_operator.cpp | 2 - be/src/pipeline/exec/set_sink_operator.h | 5 +- be/src/pipeline/exec/set_source_operator.cpp | 7 +- be/src/pipeline/exec/set_source_operator.h | 6 +- be/src/pipeline/exec/union_sink_operator.h | 13 ++ be/src/pipeline/exec/union_source_operator.cpp | 32 +++-- be/src/pipeline/exec/union_source_operator.h | 4 + be/src/pipeline/pipeline_x/dependency.cpp | 29 +++-- be/src/pipeline/pipeline_x/dependency.h | 87 ++++++-------- .../local_exchange_source_operator.cpp | 1 - be/src/pipeline/pipeline_x/operator.cpp | 131 ++++++--------------- be/src/pipeline/pipeline_x/operator.h | 35 +++--- .../pipeline_x/pipeline_x_fragment_context.cpp | 15 ++- .../pipeline_x/pipeline_x_fragment_context.h | 2 - be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 44 +++---- be/src/pipeline/pipeline_x/pipeline_x_task.h | 70 +++++------ 28 files changed, 266 insertions(+), 317 deletions(-) diff --git a/be/src/pipeline/exec/analytic_source_operator.h b/be/src/pipeline/exec/analytic_source_operator.h index e98a50186e9..b2ab5b24b3c 100644 --- a/be/src/pipeline/exec/analytic_source_operator.h +++ b/be/src/pipeline/exec/analytic_source_operator.h @@ -76,7 +76,6 @@ private: if (need_more_input) { _dependency->block(); _dependency->set_ready_to_write(); - _shared_state->sink_dep->set_ready(); } else { _dependency->set_block_to_write(); _dependency->set_ready(); diff --git a/be/src/pipeline/exec/es_scan_operator.cpp b/be/src/pipeline/exec/es_scan_operator.cpp index 96283ac5052..c00ee6917ea 100644 --- a/be/src/pipeline/exec/es_scan_operator.cpp +++ b/be/src/pipeline/exec/es_scan_operator.cpp @@ -67,7 +67,7 @@ Status EsScanLocalState::_process_conjuncts() { Status EsScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* scanners) { if (_scan_ranges.empty()) { _eos = true; - _dependency->set_ready(); + _scan_dependency->set_ready(); return Status::OK(); } diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index a43cc07b92a..1220230a343 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -170,12 +170,10 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf id, p._dest_node_id, _sender_id, _state->be_number(), state); register_channels(_sink_buffer.get()); - auto* _exchange_sink_dependency = _dependency; _queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), "ExchangeSinkQueueDependency", true, state->get_query_ctx()); _sink_buffer->set_dependency(_queue_dependency, _finish_dependency); - _exchange_sink_dependency->add_child(_queue_dependency); if ((p._part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) && !only_local_exchange) { _broadcast_dependency = @@ -186,7 +184,6 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf for (int i = 0; i < config::num_broadcast_buffer; ++i) { _broadcast_pb_blocks->push(vectorized::BroadcastPBlockHolder::create_shared()); } - _exchange_sink_dependency->add_child(_broadcast_dependency); _wait_broadcast_buffer_timer = ADD_CHILD_TIMER(_profile, "WaitForBroadcastBuffer", timer_name); @@ -194,19 +191,15 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf size_t dep_id = 0; _local_channels_dependency.resize(local_size); _wait_channel_timer.resize(local_size); - auto deps_for_channels = AndDependency::create_shared( - _parent->operator_id(), _parent->node_id(), state->get_query_ctx()); for (auto channel : channels) { if (channel->is_local()) { _local_channels_dependency[dep_id] = channel->get_local_channel_dependency(); DCHECK(_local_channels_dependency[dep_id] != nullptr); - deps_for_channels->add_child(_local_channels_dependency[dep_id]); _wait_channel_timer[dep_id] = ADD_CHILD_TIMER( _profile, fmt::format("WaitForLocalExchangeBuffer{}", dep_id), timer_name); dep_id++; } } - _exchange_sink_dependency->add_child(deps_for_channels); } if (p._part_type == TPartitionType::HASH_PARTITIONED) { _partition_count = channels.size(); diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 43919236a86..4a2f1a3dfd9 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -64,9 +64,9 @@ private: int _mult_cast_id = -1; }; -class ExchangeSinkLocalState final : public PipelineXSinkLocalState<AndSharedState> { +class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState); - using Base = PipelineXSinkLocalState<AndSharedState>; + using Base = PipelineXSinkLocalState<>; public: ExchangeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) @@ -79,6 +79,16 @@ public: state->get_query_ctx()); } + std::vector<Dependency*> dependencies() const override { + std::vector<Dependency*> dep_vec; + dep_vec.push_back(_queue_dependency.get()); + if (_broadcast_dependency) { + dep_vec.push_back(_broadcast_dependency.get()); + } + std::for_each(_local_channels_dependency.begin(), _local_channels_dependency.end(), + [&](std::shared_ptr<Dependency> dep) { dep_vec.push_back(dep.get()); }); + return dep_vec; + } Status init(RuntimeState* state, LocalSinkStateInfo& info) override; Status open(RuntimeState* state) override; Status close(RuntimeState* state, Status exec_status) override; @@ -154,8 +164,8 @@ private: vectorized::BlockSerializer<ExchangeSinkLocalState> _serializer; - std::shared_ptr<Dependency> _queue_dependency; - std::shared_ptr<Dependency> _broadcast_dependency; + std::shared_ptr<Dependency> _queue_dependency = nullptr; + std::shared_ptr<Dependency> _broadcast_dependency = nullptr; /** * We use this to control the execution for local exchange. diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index 2c519319dd0..664e576e1ce 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -74,7 +74,6 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { stream_recvr = state->exec_env()->vstream_mgr()->create_recvr( state, p.input_row_desc(), state->fragment_instance_id(), p.node_id(), p.num_senders(), profile(), p.is_merging()); - auto* source_dependency = _dependency; const auto& queues = stream_recvr->sender_queues(); deps.resize(queues.size()); metrics.resize(queues.size()); @@ -82,10 +81,8 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { deps[i] = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), "SHUFFLE_DATA_DEPENDENCY", state->get_query_ctx()); queues[i]->set_dependency(deps[i]); - source_dependency->add_child(deps[i]); } - static const std::string timer_name = - "WaitForDependency[" + source_dependency->name() + "]Time"; + static const std::string timer_name = "WaitForDependencyTime"; _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, timer_name, 1); for (size_t i = 0; i < queues.size(); i++) { metrics[i] = ADD_CHILD_TIMER_WITH_LEVEL(_runtime_profile, fmt::format("WaitForData{}", i), diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index 1b106f2e259..6176ad5b7f7 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -51,17 +51,24 @@ public: }; class ExchangeSourceOperatorX; -class ExchangeLocalState final : public PipelineXLocalState<AndSharedState> { +class ExchangeLocalState final : public PipelineXLocalState<> { ENABLE_FACTORY_CREATOR(ExchangeLocalState); public: - using Base = PipelineXLocalState<AndSharedState>; + using Base = PipelineXLocalState<>; ExchangeLocalState(RuntimeState* state, OperatorXBase* parent); Status init(RuntimeState* state, LocalStateInfo& info) override; Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; std::string debug_string(int indentation_level) const override; + + std::vector<Dependency*> dependencies() const override { + std::vector<Dependency*> dep_vec; + std::for_each(deps.begin(), deps.end(), + [&](std::shared_ptr<Dependency> dep) { dep_vec.push_back(dep.get()); }); + return dep_vec; + } std::shared_ptr<doris::vectorized::VDataStreamRecvr> stream_recvr; doris::vectorized::VSortExecExprs vsort_exec_exprs; int64_t num_rows_skipped; diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp index ce0c5042cff..ac193147dfb 100644 --- a/be/src/pipeline/exec/file_scan_operator.cpp +++ b/be/src/pipeline/exec/file_scan_operator.cpp @@ -33,7 +33,7 @@ namespace doris::pipeline { Status FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* scanners) { if (_scan_ranges.empty()) { _eos = true; - _dependency->set_ready(); + _scan_dependency->set_ready(); return Status::OK(); } diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h b/be/src/pipeline/exec/multi_cast_data_stream_sink.h index d9a29dfa0d4..b4886f089ef 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h @@ -92,11 +92,16 @@ public: const RowDescriptor& row_desc() const override { return _row_desc; } - std::shared_ptr<MultiCastSharedState> create_multi_cast_data_streamer() { - auto multi_cast_data_streamer = + std::shared_ptr<BasicSharedState> create_shared_state() const override { + std::shared_ptr<BasicSharedState> ss = std::make_shared<MultiCastSharedState>(_row_desc, _pool, _cast_sender_count); - return multi_cast_data_streamer; + ss->id = operator_id(); + for (auto& dest : dests_id()) { + ss->related_op_ids.insert(dest); + } + return ss; } + const TMultiCastDataStreamSink& sink_node() { return _sink; } private: diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index f443cacf040..0aab714449e 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -225,7 +225,7 @@ bool OlapScanLocalState::_storage_no_merge() { Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* scanners) { if (_scan_ranges.empty()) { _eos = true; - _dependency->set_ready(); + _scan_dependency->set_ready(); return Status::OK(); } SCOPED_TIMER(_scanner_init_timer); @@ -486,7 +486,7 @@ Status OlapScanLocalState::_build_key_ranges_and_filters() { } if (eos) { _eos = true; - _dependency->set_ready(); + _scan_dependency->set_ready(); } for (auto& iter : _colname_to_value_range) { diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index e3b0fe38a1d..0fb92d793a8 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -111,7 +111,12 @@ bool ScanLocalState<Derived>::should_run_serial() const { template <typename Derived> Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info) { - RETURN_IF_ERROR(PipelineXLocalState<EmptySharedState>::init(state, info)); + RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info)); + _scan_dependency = + Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + _parent->get_name() + "_DEPENDENCY", state->get_query_ctx()); + _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL( + _runtime_profile, "WaitForDependency[" + _scan_dependency->name() + "]Time", 1); SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); auto& p = _parent->cast<typename Derived::Parent>(); @@ -252,7 +257,7 @@ Status ScanLocalState<Derived>::_normalize_conjuncts() { [&](auto&& range) { if (range.is_empty_value_range()) { _eos = true; - _dependency->set_ready(); + _scan_dependency->set_ready(); } }, it.second.second); @@ -543,8 +548,7 @@ template <typename Derived> std::string ScanLocalState<Derived>::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "{}, _eos = {}", - PipelineXLocalState<EmptySharedState>::debug_string(indentation_level), - _eos.load()); + PipelineXLocalState<>::debug_string(indentation_level), _eos.load()); if (_scanner_ctx) { fmt::format_to(debug_string_buffer, ""); fmt::format_to(debug_string_buffer, ", Scanner Context: {}", _scanner_ctx->debug_string()); @@ -587,7 +591,7 @@ Status ScanLocalState<Derived>::_eval_const_conjuncts(vectorized::VExpr* vexpr, if (constant_val == nullptr || !*reinterpret_cast<bool*>(constant_val)) { *pdt = vectorized::VScanNode::PushDownType::ACCEPTABLE; _eos = true; - _dependency->set_ready(); + _scan_dependency->set_ready(); } } else if (const vectorized::ColumnVector<vectorized::UInt8>* bool_column = check_and_get_column<vectorized::ColumnVector<vectorized::UInt8>>( @@ -605,7 +609,7 @@ Status ScanLocalState<Derived>::_eval_const_conjuncts(vectorized::VExpr* vexpr, if (constant_val == nullptr || !*reinterpret_cast<bool*>(constant_val)) { *pdt = vectorized::VScanNode::PushDownType::ACCEPTABLE; _eos = true; - _dependency->set_ready(); + _scan_dependency->set_ready(); } } else { LOG(WARNING) << "Constant predicate in scan node should return a bool column with " @@ -803,7 +807,7 @@ Status ScanLocalState<Derived>::_normalize_not_in_and_not_eq_predicate( auto fn_name = std::string(""); if (!is_fixed_range && state->null_in_set) { _eos = true; - _dependency->set_ready(); + _scan_dependency->set_ready(); } while (iter->has_next()) { // column not in (nullptr) is always true @@ -1201,7 +1205,7 @@ Status ScanLocalState<Derived>::_prepare_scanners() { } if (scanners.empty()) { _eos = true; - _dependency->set_ready(); + _scan_dependency->set_ready(); } else { for (auto& scanner : scanners) { scanner->set_query_statistics(_query_statistics.get()); @@ -1218,7 +1222,7 @@ Status ScanLocalState<Derived>::_start_scanners( auto& p = _parent->cast<typename Derived::Parent>(); _scanner_ctx = PipXScannerContext::create_shared( state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(), - state()->scan_queue_mem_limit(), _dependency->shared_from_this()); + state()->scan_queue_mem_limit(), _scan_dependency); return Status::OK(); } @@ -1404,7 +1408,7 @@ Status ScanLocalState<Derived>::close(RuntimeState* state) { if (_closed) { return Status::OK(); } - COUNTER_UPDATE(exec_time_counter(), _dependency->watcher_elapse_time()); + COUNTER_UPDATE(exec_time_counter(), _scan_dependency->watcher_elapse_time()); COUNTER_UPDATE(exec_time_counter(), _filter_dependency->watcher_elapse_time()); SCOPED_TIMER(_close_timer); @@ -1412,10 +1416,10 @@ Status ScanLocalState<Derived>::close(RuntimeState* state) { if (_scanner_ctx) { _scanner_ctx->stop_scanners(state); } - COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time()); + COUNTER_SET(_wait_for_dependency_timer, _scan_dependency->watcher_elapse_time()); COUNTER_SET(_wait_for_rf_timer, _filter_dependency->watcher_elapse_time()); - return PipelineXLocalState<EmptySharedState>::close(state); + return PipelineXLocalState<>::close(state); } template <typename LocalStateType> diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 0b1b2f46da7..958581b2f6f 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -57,11 +57,10 @@ public: std::string debug_string() const override; }; -class ScanLocalStateBase : public PipelineXLocalState<EmptySharedState>, - public vectorized::RuntimeFilterConsumer { +class ScanLocalStateBase : public PipelineXLocalState<>, public vectorized::RuntimeFilterConsumer { public: ScanLocalStateBase(RuntimeState* state, OperatorXBase* parent) - : PipelineXLocalState<EmptySharedState>(state, parent), + : PipelineXLocalState<>(state, parent), vectorized::RuntimeFilterConsumer(parent->node_id(), parent->runtime_filter_descs(), parent->row_descriptor(), _conjuncts) {} virtual ~ScanLocalStateBase() = default; @@ -97,6 +96,8 @@ protected: std::atomic<bool> _opened {false}; + DependencySPtr _scan_dependency = nullptr; + std::shared_ptr<RuntimeProfile> _scanner_profile; RuntimeProfile::Counter* _scanner_sched_counter = nullptr; RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr; @@ -166,6 +167,8 @@ class ScanLocalState : public ScanLocalStateBase { RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); }; + std::vector<Dependency*> dependencies() const override { return {_scan_dependency.get()}; } + protected: template <typename LocalStateType> friend class ScanOperatorX; diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h b/be/src/pipeline/exec/set_probe_sink_operator.h index 2e8500cd452..6b4197ea94b 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.h +++ b/be/src/pipeline/exec/set_probe_sink_operator.h @@ -131,6 +131,8 @@ public: : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } + std::shared_ptr<BasicSharedState> create_shared_state() const override { return nullptr; } + private: void _finalize_probe(SetProbeSinkLocalState<is_intersect>& local_state); Status _extract_probe_column(SetProbeSinkLocalState<is_intersect>& local_state, diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp index 9080bb24504..7ef4871555d 100644 --- a/be/src/pipeline/exec/set_sink_operator.cpp +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -198,10 +198,8 @@ Status SetSinkOperatorX<is_intersect>::init(const TPlanNode& tnode, RuntimeState // Create result_expr_ctx_lists_ from thrift exprs. if (tnode.node_type == TPlanNodeType::type::INTERSECT_NODE) { result_texpr_lists = &(tnode.intersect_node.result_expr_lists); - _child_quantity = tnode.intersect_node.result_expr_lists.size(); } else if (tnode.node_type == TPlanNodeType::type::EXCEPT_NODE) { result_texpr_lists = &(tnode.except_node.result_expr_lists); - _child_quantity = tnode.except_node.result_expr_lists.size(); } else { return Status::NotSupported("Not Implemented, Check The Operation Node."); } diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index 63b2b89380b..24f23593ea0 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -99,6 +99,9 @@ public: const DescriptorTbl& descs) : Base(sink_id, tnode.node_id, tnode.node_id), _cur_child_id(child_id), + _child_quantity(tnode.node_type == TPlanNodeType::type::INTERSECT_NODE + ? tnode.intersect_node.result_expr_lists.size() + : tnode.except_node.result_expr_lists.size()), _is_colocate(is_intersect ? tnode.intersect_node.is_colocate : tnode.except_node.is_colocate), _partition_exprs(is_intersect ? tnode.intersect_node.result_expr_lists[child_id] @@ -131,7 +134,7 @@ private: vectorized::Block& block, vectorized::ColumnRawPtrs& raw_ptrs); const int _cur_child_id; - int _child_quantity; + const int _child_quantity; // every child has its result expr list vectorized::VExprContextSPtrs _child_exprs; const bool _is_colocate; diff --git a/be/src/pipeline/exec/set_source_operator.cpp b/be/src/pipeline/exec/set_source_operator.cpp index 954ca28dc9b..15524a25a7b 100644 --- a/be/src/pipeline/exec/set_source_operator.cpp +++ b/be/src/pipeline/exec/set_source_operator.cpp @@ -54,11 +54,8 @@ Status SetSourceLocalState<is_intersect>::init(RuntimeState* state, LocalStateIn RETURN_IF_ERROR(Base::init(state, info)); SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); - auto& deps = info.upstream_dependencies; - _shared_state->probe_finished_children_dependency.resize(deps.size(), nullptr); - for (auto& dep : deps) { - dep->set_shared_state(_dependency->shared_state()); - } + _shared_state->probe_finished_children_dependency.resize( + _parent->cast<SetSourceOperatorX<is_intersect>>()._child_quantity, nullptr); return Status::OK(); } diff --git a/be/src/pipeline/exec/set_source_operator.h b/be/src/pipeline/exec/set_source_operator.h index d7026f015cf..1c5cf162940 100644 --- a/be/src/pipeline/exec/set_source_operator.h +++ b/be/src/pipeline/exec/set_source_operator.h @@ -85,7 +85,10 @@ public: SetSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs) - : Base(pool, tnode, operator_id, descs) {}; + : Base(pool, tnode, operator_id, descs), + _child_quantity(tnode.node_type == TPlanNodeType::type::INTERSECT_NODE + ? tnode.intersect_node.result_expr_lists.size() + : tnode.except_node.result_expr_lists.size()) {}; ~SetSourceOperatorX() override = default; [[nodiscard]] bool is_source() const override { return true; } @@ -105,6 +108,7 @@ private: void _add_result_columns(SetSourceLocalState<is_intersect>& local_state, vectorized::RowRefListWithFlags& value, int& block_size); + const int _child_quantity; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/union_sink_operator.h b/be/src/pipeline/exec/union_sink_operator.h index 5fd670f0ef7..6d79d3f2a9f 100644 --- a/be/src/pipeline/exec/union_sink_operator.h +++ b/be/src/pipeline/exec/union_sink_operator.h @@ -111,6 +111,19 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; + std::shared_ptr<BasicSharedState> create_shared_state() const override { + if (_cur_child_id > 0) { + return nullptr; + } else { + std::shared_ptr<BasicSharedState> ss = std::make_shared<UnionSharedState>(_child_size); + ss->id = operator_id(); + for (auto& dest : dests_id()) { + ss->related_op_ids.insert(dest); + } + return ss; + } + } + private: int _get_first_materialized_child_idx() const { return _first_materialized_child_idx; } diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index f2f4ca82e4c..dc1de2900b9 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -111,15 +111,18 @@ Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); auto& p = _parent->cast<Parent>(); - int child_count = p.get_child_count(); - if (child_count != 0) { - auto& deps = info.upstream_dependencies; - for (auto& dep : deps) { - dep->set_shared_state(_dependency->shared_state()); - } + if (p.get_child_count() != 0) { + ((UnionSharedState*)_dependency->shared_state()) + ->data_queue.set_source_dependency(_shared_state->source_deps.front()); + } else { + _only_const_dependency = Dependency::create_shared( + _parent->operator_id(), _parent->node_id(), _parent->get_name() + "_DEPENDENCY", + state->get_query_ctx()); + _dependency = _only_const_dependency.get(); + _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL( + _runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1); } - ((UnionSharedState*)_dependency->shared_state()) - ->data_queue.set_source_dependency(info.dependency); + // Const exprs materialized by this node. These exprs don't refer to any children. // Only materialized by the first fragment instance to avoid duplication. if (state->per_fragment_instance_idx() == 0) { @@ -138,7 +141,8 @@ Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(clone_expr_list(_const_expr_list, other_expr_list)); } } - if (child_count == 0) { + + if (p.get_child_count() == 0) { _dependency->set_ready(); } return Status::OK(); @@ -147,9 +151,11 @@ Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { std::string UnionSourceLocalState::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level)); - fmt::format_to(debug_string_buffer, ", data_queue: (is_all_finish = {}, has_data = {})", - _shared_state->data_queue.is_all_finish(), - _shared_state->data_queue.remaining_has_data()); + if (_shared_state) { + fmt::format_to(debug_string_buffer, ", data_queue: (is_all_finish = {}, has_data = {})", + _shared_state->data_queue.is_all_finish(), + _shared_state->data_queue.remaining_has_data()); + } return fmt::to_string(debug_string_buffer); } @@ -161,7 +167,7 @@ Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* b RETURN_IF_ERROR(get_next_const(state, block)); } local_state._need_read_for_const_expr = has_more_const(state); - } else { + } else if (_child_size != 0) { std::unique_ptr<vectorized::Block> output_block = vectorized::Block::create_unique(); int child_idx = 0; RETURN_IF_ERROR(local_state._shared_state->data_queue.get_block_from_queue(&output_block, diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h index 69e81bcd4fc..40d02324cbd 100644 --- a/be/src/pipeline/exec/union_source_operator.h +++ b/be/src/pipeline/exec/union_source_operator.h @@ -87,6 +87,10 @@ private: bool _need_read_for_const_expr {true}; int _const_expr_list_idx {0}; std::vector<vectorized::VExprContextSPtrs> _const_expr_lists; + + // If this operator has no children, there is no shared state which owns dependency. So we + // use this local state to hold this dependency. + DependencySPtr _only_const_dependency = nullptr; }; class UnionSourceOperatorX final : public OperatorX<UnionSourceLocalState> { diff --git a/be/src/pipeline/pipeline_x/dependency.cpp b/be/src/pipeline/pipeline_x/dependency.cpp index 56045118a94..adbadcfb835 100644 --- a/be/src/pipeline/pipeline_x/dependency.cpp +++ b/be/src/pipeline/pipeline_x/dependency.cpp @@ -29,6 +29,22 @@ namespace doris::pipeline { +Dependency* BasicSharedState::create_source_dependency(int operator_id, int node_id, + std::string name, QueryContext* ctx) { + source_deps.push_back( + std::make_shared<Dependency>(operator_id, node_id, name + "_DEPENDENCY", ctx)); + source_deps.back()->set_shared_state(this); + return source_deps.back().get(); +} + +Dependency* BasicSharedState::create_sink_dependency(int dest_id, int node_id, std::string name, + QueryContext* ctx) { + sink_deps.push_back( + std::make_shared<Dependency>(dest_id, node_id, name + "_DEPENDENCY", true, ctx)); + sink_deps.back()->set_shared_state(this); + return sink_deps.back().get(); +} + void Dependency::_add_block_task(PipelineXTask* task) { DCHECK(_blocked_task.empty() || _blocked_task[_blocked_task.size() - 1] != task) << "Duplicate task: " << task->debug_string(); @@ -103,17 +119,6 @@ std::string RuntimeFilterDependency::debug_string(int indentation_level) { return fmt::to_string(debug_string_buffer); } -std::string AndDependency::debug_string(int indentation_level) { - fmt::memory_buffer debug_string_buffer; - fmt::format_to(debug_string_buffer, "{}{}: id={}, children=[", - std::string(indentation_level * 2, ' '), _name, _node_id); - for (auto& child : _children) { - fmt::format_to(debug_string_buffer, "{}, \n", child->debug_string(indentation_level = 1)); - } - fmt::format_to(debug_string_buffer, "{}]", std::string(indentation_level * 2, ' ')); - return fmt::to_string(debug_string_buffer); -} - bool RuntimeFilterTimer::has_ready() { std::unique_lock<std::mutex> lc(_lock); return _is_ready; @@ -193,7 +198,7 @@ void LocalExchangeSharedState::sub_running_sink_operators() { } LocalExchangeSharedState::LocalExchangeSharedState(int num_instances) { - source_dependencies.resize(num_instances, nullptr); + source_deps.resize(num_instances, nullptr); mem_trackers.resize(num_instances, nullptr); } diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 3de90fa915a..1733a8fd291 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -70,9 +70,18 @@ struct BasicSharedState { << " and expect type is" << typeid(TARGET).name(); return reinterpret_cast<const TARGET*>(this); } - DependencySPtr source_dep = nullptr; - DependencySPtr sink_dep = nullptr; + std::vector<DependencySPtr> source_deps; + std::vector<DependencySPtr> sink_deps; + int id = 0; + std::set<int> related_op_ids; + virtual ~BasicSharedState() = default; + + Dependency* create_source_dependency(int operator_id, int node_id, std::string name, + QueryContext* ctx); + + Dependency* create_sink_dependency(int dest_id, int node_id, std::string name, + QueryContext* ctx); }; class Dependency : public std::enable_shared_from_this<Dependency> { @@ -94,22 +103,15 @@ public: _query_ctx(query_ctx) {} virtual ~Dependency() = default; + bool is_write_dependency() const { return _is_write_dependency; } [[nodiscard]] int id() const { return _id; } [[nodiscard]] virtual std::string name() const { return _name; } - virtual void add_child(std::shared_ptr<Dependency> child) { - LOG(FATAL) << "Only AndDependency could add child, it is wrong usage"; - } BasicSharedState* shared_state() { return _shared_state; } void set_shared_state(BasicSharedState* shared_state) { _shared_state = shared_state; } virtual std::string debug_string(int indentation_level = 0); // Start the watcher. We use it to count how long this dependency block the current pipeline task. - void start_watcher() { - for (auto& child : _children) { - child->start_watcher(); - } - _watcher.start(); - } + void start_watcher() { _watcher.start(); } [[nodiscard]] int64_t watcher_elapse_time() { return _watcher.elapsed_time(); } // Which dependency current pipeline task is blocked by. `nullptr` if this dependency is ready. @@ -118,21 +120,21 @@ public: void set_ready(); void set_ready_to_read() { DCHECK(_is_write_dependency) << debug_string(); - DCHECK(_shared_state->source_dep != nullptr) << debug_string(); - _shared_state->source_dep->set_ready(); + DCHECK(_shared_state->source_deps.size() == 1) << debug_string(); + _shared_state->source_deps.front()->set_ready(); } void set_block_to_read() { DCHECK(_is_write_dependency) << debug_string(); - DCHECK(_shared_state->source_dep != nullptr) << debug_string(); - _shared_state->source_dep->block(); + DCHECK(_shared_state->source_deps.size() == 1) << debug_string(); + _shared_state->source_deps.front()->block(); } void set_ready_to_write() { - DCHECK(_shared_state->sink_dep != nullptr) << debug_string(); - _shared_state->sink_dep->set_ready(); + DCHECK(_shared_state->sink_deps.size() == 1) << debug_string(); + _shared_state->sink_deps.front()->set_ready(); } void set_block_to_write() { - DCHECK(_shared_state->sink_dep != nullptr) << debug_string(); - _shared_state->sink_dep->block(); + DCHECK(_shared_state->sink_deps.size() == 1) << debug_string(); + _shared_state->sink_deps.front()->block(); } // Notify downstream pipeline tasks this dependency is blocked. @@ -172,7 +174,6 @@ protected: BasicSharedState* _shared_state = nullptr; MonotonicStopWatch _watcher; - std::list<std::shared_ptr<Dependency>> _children; std::mutex _task_lock; std::vector<PipelineXTask*> _blocked_task; @@ -322,31 +323,6 @@ protected: std::shared_ptr<std::atomic_bool> _blocked_by_rf; }; -struct EmptySharedState final : public BasicSharedState {}; - -struct AndSharedState final : public BasicSharedState {}; - -class AndDependency final : public Dependency { -public: - using SharedState = AndSharedState; - ENABLE_FACTORY_CREATOR(AndDependency); - AndDependency(int id, int node_id, QueryContext* query_ctx) - : Dependency(id, node_id, "AndDependency", query_ctx) {} - - std::string debug_string(int indentation_level = 0) override; - - void add_child(std::shared_ptr<Dependency> child) override { _children.push_back(child); } - - [[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override { - for (auto& child : Dependency::_children) { - if (auto* dep = child->is_blocked_by(task)) { - return dep; - } - } - return nullptr; - } -}; - struct AggSharedState : public BasicSharedState { public: AggSharedState() { @@ -661,25 +637,28 @@ public: ENABLE_FACTORY_CREATOR(LocalExchangeSharedState); LocalExchangeSharedState(int num_instances); std::unique_ptr<Exchanger> exchanger {}; - std::vector<DependencySPtr> source_dependencies; - DependencySPtr sink_dependency; std::vector<MemTracker*> mem_trackers; std::atomic<size_t> mem_usage = 0; std::mutex le_lock; + void create_source_dependencies(int operator_id, int node_id, QueryContext* ctx) { + for (size_t i = 0; i < source_deps.size(); i++) { + source_deps[i] = std::make_shared<Dependency>( + operator_id, node_id, "LOCAL_EXCHANGE_OPERATOR_DEPENDENCY", ctx); + source_deps[i]->set_shared_state(this); + } + }; void sub_running_sink_operators(); void _set_always_ready() { - for (auto& dep : source_dependencies) { + for (auto& dep : source_deps) { DCHECK(dep); dep->set_always_ready(); } } - void set_dep_by_channel_id(DependencySPtr dep, int channel_id) { - source_dependencies[channel_id] = dep; - } + Dependency* get_dep_by_channel_id(int channel_id) { return source_deps[channel_id].get(); } void set_ready_to_read(int channel_id) { - auto& dep = source_dependencies[channel_id]; + auto& dep = source_deps[channel_id]; DCHECK(dep) << channel_id; dep->set_ready(); } @@ -700,13 +679,13 @@ public: void add_total_mem_usage(size_t delta) { if (mem_usage.fetch_add(delta) > config::local_exchange_buffer_mem_limit) { - sink_dependency->block(); + sink_deps.front()->block(); } } void sub_total_mem_usage(size_t delta) { if (mem_usage.fetch_sub(delta) <= config::local_exchange_buffer_mem_limit) { - sink_dependency->set_ready(); + sink_deps.front()->set_ready(); } } }; diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp index 568871835c6..71a5a6b3c13 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp @@ -26,7 +26,6 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo& SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); _channel_id = info.task_idx; - _shared_state->set_dep_by_channel_id(info.dependency, _channel_id); _shared_state->mem_trackers[_channel_id] = _mem_tracker.get(); _exchanger = _shared_state->exchanger.get(); DCHECK(_exchanger != nullptr); diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 433e4b48654..30ff16dde80 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -267,80 +267,24 @@ Status DataSinkOperatorX<LocalStateType>::setup_local_state(RuntimeState* state, return Status::OK(); } -template <typename SharedStateType> -constexpr bool NeedToCreate = true; -template <> -inline constexpr bool NeedToCreate<MultiCastSharedState> = false; -template <> -inline constexpr bool NeedToCreate<SetSharedState> = false; -template <> -inline constexpr bool NeedToCreate<UnionSharedState> = false; -template <> -inline constexpr bool NeedToCreate<LocalExchangeSharedState> = false; - template <typename LocalStateType> -void DataSinkOperatorX<LocalStateType>::get_dependency( - vector<DependencySPtr>& dependency, - std::map<int, std::shared_ptr<BasicSharedState>>& shared_states, QueryContext* ctx) { - std::shared_ptr<BasicSharedState> ss = nullptr; - if constexpr (NeedToCreate<typename LocalStateType::SharedStateType>) { - ss.reset(new typename LocalStateType::SharedStateType()); - DCHECK(!shared_states.contains(dests_id().front())); - if constexpr (!std::is_same_v<typename LocalStateType::SharedStateType, FakeSharedState>) { - shared_states.insert({dests_id().front(), ss}); - } +std::shared_ptr<BasicSharedState> DataSinkOperatorX<LocalStateType>::create_shared_state() const { + if constexpr (std::is_same_v<typename LocalStateType::SharedStateType, + LocalExchangeSharedState>) { + return nullptr; } else if constexpr (std::is_same_v<typename LocalStateType::SharedStateType, MultiCastSharedState>) { - ss = ((MultiCastDataStreamSinkOperatorX*)this)->create_multi_cast_data_streamer(); - auto& dests = dests_id(); - for (auto& dest_id : dests) { - DCHECK(!shared_states.contains(dest_id)); - shared_states.insert({dest_id, ss}); - } - } - if constexpr (std::is_same_v<typename LocalStateType::SharedStateType, AndSharedState>) { - auto& dests = dests_id(); - for (auto& dest_id : dests) { - dependency.push_back(std::make_shared<AndDependency>(dest_id, _node_id, ctx)); - dependency.back()->set_shared_state(ss.get()); - } - } else if constexpr (!std::is_same_v<typename LocalStateType::SharedStateType, - FakeSharedState>) { - auto& dests = dests_id(); - for (auto& dest_id : dests) { - dependency.push_back(std::make_shared<Dependency>(dest_id, _node_id, - _name + "_DEPENDENCY", true, ctx)); - dependency.back()->set_shared_state(ss.get()); - } + LOG(FATAL) << "should not reach here!"; + return nullptr; } else { - dependency.push_back(nullptr); - } -} - -template <typename LocalStateType> -DependencySPtr OperatorX<LocalStateType>::get_dependency( - QueryContext* ctx, std::map<int, std::shared_ptr<BasicSharedState>>& shared_states) { - std::shared_ptr<BasicSharedState> ss = nullptr; - if constexpr (std::is_same_v<typename LocalStateType::SharedStateType, SetSharedState>) { + std::shared_ptr<BasicSharedState> ss = nullptr; ss.reset(new typename LocalStateType::SharedStateType()); - shared_states.insert({operator_id(), ss}); - } else if constexpr (std::is_same_v<typename LocalStateType::SharedStateType, - UnionSharedState>) { - ss.reset(new typename LocalStateType::SharedStateType( - ((UnionSourceOperatorX*)this)->get_child_count())); - shared_states.insert({operator_id(), ss}); - } - DependencySPtr dep = nullptr; - if constexpr (std::is_same_v<typename LocalStateType::SharedStateType, AndSharedState>) { - dep = std::make_shared<AndDependency>(_operator_id, _node_id, ctx); - } else if constexpr (std::is_same_v<typename LocalStateType::SharedStateType, - FakeSharedState>) { - dep = std::make_shared<FakeDependency>(_operator_id, _node_id, ctx); - } else { - dep = std::make_shared<Dependency>(_operator_id, _node_id, _op_name + "_DEPENDENCY", ctx); - dep->set_shared_state(ss.get()); + ss->id = operator_id(); + for (auto& dest : dests_id()) { + ss->related_op_ids.insert(dest); + } + return ss; } - return dep; } template <typename LocalStateType> @@ -373,25 +317,22 @@ Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState _runtime_profile->set_is_sink(false); info.parent_profile->add_child(_runtime_profile.get(), true, nullptr); constexpr auto is_fake_shared = std::is_same_v<SharedStateArg, FakeSharedState>; - _dependency = info.dependency.get(); if constexpr (!is_fake_shared) { - _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL( - _runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1); - auto& deps = info.upstream_dependencies; if constexpr (std::is_same_v<LocalExchangeSharedState, SharedStateArg>) { - _dependency->set_shared_state(info.le_state_map[_parent->operator_id()].first.get()); - _shared_state = _dependency->shared_state()->template cast<SharedStateArg>(); - - _shared_state->source_dep = info.dependency; - } else if constexpr (!std::is_same_v<SharedStateArg, EmptySharedState> && - !std::is_same_v<SharedStateArg, AndSharedState>) { - _dependency->set_shared_state(info.shared_state); - _shared_state = _dependency->shared_state()->template cast<SharedStateArg>(); - - _shared_state->source_dep = info.dependency; - if (!deps.empty()) { - _shared_state->sink_dep = deps.front(); - } + _shared_state = info.le_state_map[_parent->operator_id()].first.get(); + + _dependency = _shared_state->get_dep_by_channel_id(info.task_idx); + _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL( + _runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1); + } else if (info.shared_state) { + // For UnionSourceOperator without children, there is no shared state. + _shared_state = info.shared_state->template cast<SharedStateArg>(); + + _dependency = _shared_state->create_source_dependency( + _parent->operator_id(), _parent->node_id(), _parent->get_name(), + state->get_query_ctx()); + _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL( + _runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1); } } @@ -445,20 +386,19 @@ Status PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink _wait_for_finish_dependency_timer = ADD_TIMER(_profile, "PendingFinishDependency"); constexpr auto is_fake_shared = std::is_same_v<SharedState, FakeSharedState>; if constexpr (!is_fake_shared) { - auto& deps = info.dependencies; - _dependency = deps.front().get(); if constexpr (std::is_same_v<LocalExchangeSharedState, SharedState>) { _dependency = info.le_state_map[_parent->dests_id().front()].second.get(); - } - if (_dependency) { _shared_state = (SharedState*)_dependency->shared_state(); - _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL( - _profile, "WaitForDependency[" + _dependency->name() + "]Time", 1); + } else { + _shared_state = info.shared_state->template cast<SharedState>(); + _dependency = _shared_state->create_sink_dependency( + _parent->dests_id().front(), _parent->node_id(), _parent->get_name(), + state->get_query_ctx()); } + _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL( + _profile, "WaitForDependency[" + _dependency->name() + "]Time", 1); } else { - auto& deps = info.dependencies; - deps.front() = std::make_shared<FakeDependency>(0, 0, state->get_query_ctx()); - _dependency = deps.front().get(); + _dependency = nullptr; } _rows_input_counter = ADD_COUNTER_WITH_LEVEL(_profile, "InputRows", TUnit::UNIT, 1); _open_timer = ADD_TIMER_WITH_LEVEL(_profile, "OpenTime", 1); @@ -656,7 +596,6 @@ template class PipelineXSinkLocalState<UnionSharedState>; template class PipelineXSinkLocalState<PartitionSortNodeSharedState>; template class PipelineXSinkLocalState<MultiCastSharedState>; template class PipelineXSinkLocalState<SetSharedState>; -template class PipelineXSinkLocalState<AndSharedState>; template class PipelineXSinkLocalState<LocalExchangeSharedState>; template class PipelineXSinkLocalState<BasicSharedState>; @@ -671,8 +610,6 @@ template class PipelineXLocalState<MultiCastSharedState>; template class PipelineXLocalState<PartitionSortNodeSharedState>; template class PipelineXLocalState<SetSharedState>; template class PipelineXLocalState<LocalExchangeSharedState>; -template class PipelineXLocalState<EmptySharedState>; -template class PipelineXLocalState<AndSharedState>; template class PipelineXLocalState<BasicSharedState>; template class AsyncWriterSink<doris::vectorized::VFileResultWriter, ResultFileSinkOperatorX>; diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 5d1268e50c9..b9c02a935a6 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -31,13 +31,10 @@ namespace doris::pipeline { struct LocalStateInfo { RuntimeProfile* parent_profile = nullptr; const std::vector<TScanRangeParams> scan_ranges; - std::vector<DependencySPtr>& upstream_dependencies; BasicSharedState* shared_state; std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>> le_state_map; const int task_idx; - - DependencySPtr dependency; }; // This struct is used only for initializing local sink state. @@ -45,7 +42,7 @@ struct LocalSinkStateInfo { const int task_idx; RuntimeProfile* parent_profile = nullptr; const int sender_id; - std::vector<DependencySPtr>& dependencies; + BasicSharedState* shared_state; std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>> le_state_map; const TDataSink& tsink; @@ -100,7 +97,7 @@ public: [[nodiscard]] virtual std::string debug_string(int indentation_level = 0) const = 0; - virtual Dependency* dependency() { return nullptr; } + virtual std::vector<Dependency*> dependencies() const { return {nullptr}; } // override in Scan virtual Dependency* finishdependency() { return nullptr; } @@ -184,8 +181,6 @@ public: throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, _op_name); } [[nodiscard]] std::string get_name() const override { return _op_name; } - [[nodiscard]] virtual DependencySPtr get_dependency( - QueryContext* ctx, std::map<int, std::shared_ptr<BasicSharedState>>& shared_states) = 0; [[nodiscard]] virtual DataDistribution required_data_distribution() const { return _child_x && _child_x->ignore_data_distribution() && !is_source() ? DataDistribution(ExchangeType::PASSTHROUGH) @@ -348,10 +343,6 @@ public: [[nodiscard]] LocalState& get_local_state(RuntimeState* state) const { return state->get_local_state(operator_id())->template cast<LocalState>(); } - - DependencySPtr get_dependency( - QueryContext* ctx, - std::map<int, std::shared_ptr<BasicSharedState>>& shared_states) override; }; template <typename SharedStateArg = FakeSharedState> @@ -372,7 +363,9 @@ public: [[nodiscard]] std::string debug_string(int indentation_level = 0) const override; - Dependency* dependency() override { return _dependency; } + std::vector<Dependency*> dependencies() const override { + return _dependency ? std::vector<Dependency*> {_dependency} : std::vector<Dependency*> {}; + } protected: Dependency* _dependency = nullptr; @@ -422,7 +415,7 @@ public: RuntimeProfile::Counter* rows_input_counter() { return _rows_input_counter; } RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; } - virtual Dependency* dependency() { return nullptr; } + virtual std::vector<Dependency*> dependencies() const { return {nullptr}; } // override in exchange sink , AsyncWriterSink virtual Dependency* finishdependency() { return nullptr; } @@ -513,9 +506,7 @@ public: return reinterpret_cast<const TARGET&>(*this); } - virtual void get_dependency(std::vector<DependencySPtr>& dependency, - std::map<int, std::shared_ptr<BasicSharedState>>& shared_states, - QueryContext* ctx) = 0; + [[nodiscard]] virtual std::shared_ptr<BasicSharedState> create_shared_state() const = 0; [[nodiscard]] virtual DataDistribution required_data_distribution() const { return _child_x && _child_x->ignore_data_distribution() ? DataDistribution(ExchangeType::PASSTHROUGH) @@ -612,9 +603,7 @@ public: ~DataSinkOperatorX() override = default; Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) override; - void get_dependency(std::vector<DependencySPtr>& dependency, - std::map<int, std::shared_ptr<BasicSharedState>>& shared_states, - QueryContext* ctx) override; + std::shared_ptr<BasicSharedState> create_shared_state() const override; using LocalState = LocalStateType; [[nodiscard]] LocalState& get_local_state(RuntimeState* state) const { @@ -640,7 +629,9 @@ public: virtual std::string name_suffix() { return " (id=" + std::to_string(_parent->node_id()) + ")"; } - Dependency* dependency() override { return _dependency; } + std::vector<Dependency*> dependencies() const override { + return _dependency ? std::vector<Dependency*> {_dependency} : std::vector<Dependency*> {}; + } protected: Dependency* _dependency = nullptr; @@ -717,7 +708,9 @@ public: Status sink(RuntimeState* state, vectorized::Block* block, bool eos); - Dependency* dependency() override { return _async_writer_dependency.get(); } + std::vector<Dependency*> dependencies() const override { + return {_async_writer_dependency.get()}; + } Status close(RuntimeState* state, Status exec_status) override; Dependency* finishdependency() override { return _finish_dependency.get(); } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index f13cf37b1fb..0d1d7784f88 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -610,9 +610,13 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( auto& deps = _dag[_pipeline->id()]; for (auto& dep : deps) { if (pipeline_id_to_task.contains(dep)) { - task->add_upstream_dependency( - pipeline_id_to_task[dep]->get_downstream_dependency(), - pipeline_id_to_task[dep]->get_shared_states()); + auto ss = pipeline_id_to_task[dep]->get_sink_shared_state(); + if (ss) { + task->inject_shared_state(ss); + } else { + pipeline_id_to_task[dep]->inject_shared_state( + task->get_source_shared_state()); + } } } } @@ -781,7 +785,7 @@ Status PipelineXFragmentContext::_add_local_exchange_impl( "LOCAL_EXCHANGE_SINK_DEPENDENCY", true, _runtime_state->get_query_ctx()); sink_dep->set_shared_state(shared_state.get()); - shared_state->sink_dependency = sink_dep; + shared_state->sink_deps.push_back(sink_dep); _op_id_to_le_state.insert({local_exchange_id, {shared_state, sink_dep}}); // 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to @@ -804,6 +808,9 @@ Status PipelineXFragmentContext::_add_local_exchange_impl( } operator_xs.insert(operator_xs.begin(), source_op); + shared_state->create_source_dependencies(source_op->operator_id(), source_op->node_id(), + _query_ctx.get()); + // 5. Set children for two pipelines separately. std::vector<std::shared_ptr<Pipeline>> new_children; std::vector<PipelineId> edges_with_source; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 9630484f443..1cbf4e4940e 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -191,8 +191,6 @@ private: #pragma clang diagnostic pop #endif - std::atomic_bool _canceled = false; - // `_dag` manage dependencies between pipelines by pipeline ID. the indices will be blocked by members std::map<PipelineId, std::vector<PipelineId>> _dag; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 7c0e81c994f..0459f1e3a0e 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -60,10 +60,10 @@ PipelineXTask::PipelineXTask( _task_idx(task_idx), _execution_dep(state->get_query_ctx()->get_execution_dependency()) { _pipeline_task_watcher.start(); - _sink->get_dependency(_downstream_dependency, _shared_states, state->get_query_ctx()); - for (auto& op : _operators) { - _source_dependency.insert( - {op->operator_id(), op->get_dependency(state->get_query_ctx(), _shared_states)}); + + auto shared_state = _sink->create_shared_state(); + if (shared_state) { + _sink_shared_state = shared_state; } pipeline->incr_created_tasks(); } @@ -82,7 +82,7 @@ Status PipelineXTask::prepare(const TPipelineInstanceParams& local_params, const LocalSinkStateInfo info {_task_idx, _task_profile.get(), local_params.sender_id, - get_downstream_dependency(), + get_sink_shared_state().get(), _le_state_map, tsink}; RETURN_IF_ERROR(_sink->setup_local_state(_state, info)); @@ -97,14 +97,8 @@ Status PipelineXTask::prepare(const TPipelineInstanceParams& local_params, const for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) { auto& op = _operators[op_idx]; - auto& deps = get_upstream_dependency(op->operator_id()); - LocalStateInfo info {parent_profile, - scan_ranges, - deps, - get_shared_state(op->operator_id()), - _le_state_map, - _task_idx, - _source_dependency[op->operator_id()]}; + LocalStateInfo info {parent_profile, scan_ranges, get_op_shared_state(op->operator_id()), + _le_state_map, _task_idx}; RETURN_IF_ERROR(op->setup_local_state(_state, info)); parent_profile = _state->get_local_state(op->operator_id())->profile(); query_ctx->register_query_statistics( @@ -126,9 +120,9 @@ Status PipelineXTask::_extract_dependencies() { return result.error(); } auto* local_state = result.value(); - auto* dep = local_state->dependency(); - DCHECK(dep != nullptr); - _read_dependencies.push_back(dep); + const auto& deps = local_state->dependencies(); + std::copy(deps.begin(), deps.end(), + std::inserter(_read_dependencies, _read_dependencies.end())); auto* fin_dep = local_state->finishdependency(); if (fin_dep) { _finish_dependencies.push_back(fin_dep); @@ -136,9 +130,9 @@ Status PipelineXTask::_extract_dependencies() { } { auto* local_state = _state->get_sink_local_state(); - auto* dep = local_state->dependency(); - DCHECK(dep != nullptr); - _write_dependencies = dep; + _write_dependencies = local_state->dependencies(); + DCHECK(std::all_of(_write_dependencies.begin(), _write_dependencies.end(), + [](auto* dep) { return dep->is_write_dependency(); })); auto* fin_dep = local_state->finishdependency(); if (fin_dep) { _finish_dependencies.push_back(fin_dep); @@ -302,10 +296,8 @@ void PipelineXTask::finalize() { PipelineTask::finalize(); std::unique_lock<std::mutex> lc(_release_lock); _finished = true; - std::vector<DependencySPtr> {}.swap(_downstream_dependency); - _upstream_dependency.clear(); - _source_dependency.clear(); - _shared_states.clear(); + _sink_shared_state.reset(); + _op_shared_states.clear(); _le_state_map.clear(); } @@ -372,8 +364,10 @@ std::string PipelineXTask::debug_string() { } fmt::format_to(debug_string_buffer, "Write Dependency Information: \n"); - fmt::format_to(debug_string_buffer, "{}. {}\n", i, _write_dependencies->debug_string(1)); - i++; + for (size_t j = 0; j < _write_dependencies.size(); j++, i++) { + fmt::format_to(debug_string_buffer, "{}. {}\n", i, + _write_dependencies[j]->debug_string(i + 1)); + } if (_filter_dependency) { fmt::format_to(debug_string_buffer, "Runtime Filter Dependency Information: \n"); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index 6e8a3773751..8707de4b54c 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -94,41 +94,36 @@ public: bool is_pending_finish() override { return _finish_blocked_dependency() != nullptr; } - std::vector<DependencySPtr>& get_downstream_dependency() { return _downstream_dependency; } - std::map<int, std::shared_ptr<BasicSharedState>>& get_shared_states() { return _shared_states; } - - void add_upstream_dependency(std::vector<DependencySPtr>& multi_upstream_dependency, - std::map<int, std::shared_ptr<BasicSharedState>>& shared_states) { - for (auto dep : multi_upstream_dependency) { - int dst_id = dep->id(); - if (!_upstream_dependency.contains(dst_id)) { - _upstream_dependency.insert({dst_id, {dep}}); - } else { - _upstream_dependency[dst_id].push_back(dep); - } + std::shared_ptr<BasicSharedState> get_source_shared_state() { + return _op_shared_states.contains(_source->operator_id()) + ? _op_shared_states[_source->operator_id()] + : nullptr; + } - if (shared_states.contains(dst_id) && !_shared_states.contains(dst_id)) { - // Shared state is created by upstream task's sink operator and shared by source operator of this task. - _shared_states.insert({dst_id, shared_states[dst_id]}); - } else if (_shared_states.contains(dst_id) && !shared_states.contains(dst_id)) { - // Shared state is created by this task's source operator and shared by upstream task's sink operator. - shared_states.insert({dst_id, _shared_states[dst_id]}); + void inject_shared_state(std::shared_ptr<BasicSharedState> shared_state) { + if (!shared_state) { + return; + } + // Shared state is created by upstream task's sink operator and shared by source operator of this task. + for (auto& op : _operators) { + if (shared_state->related_op_ids.contains(op->operator_id())) { + _op_shared_states.insert({op->operator_id(), shared_state}); + return; } } - } - - std::vector<DependencySPtr>& get_upstream_dependency(int id) { - if (_upstream_dependency.find(id) == _upstream_dependency.end()) { - _upstream_dependency.insert({id, {}}); + if (shared_state->related_op_ids.contains(_sink->dests_id().front())) { + DCHECK(_sink_shared_state == nullptr); + _sink_shared_state = shared_state; } - return _upstream_dependency[id]; } - BasicSharedState* get_shared_state(int id) { - if (!_shared_states.contains(id)) { + std::shared_ptr<BasicSharedState> get_sink_shared_state() { return _sink_shared_state; } + + BasicSharedState* get_op_shared_state(int id) { + if (!_op_shared_states.contains(id)) { return nullptr; } - return _shared_states[id].get(); + return _op_shared_states[id].get(); } bool is_pipelineX() const override { return true; } @@ -161,10 +156,12 @@ public: private: Dependency* _write_blocked_dependency() { - _blocked_dep = _write_dependencies->is_blocked_by(this); - if (_blocked_dep != nullptr) { - static_cast<Dependency*>(_blocked_dep)->start_watcher(); - return _blocked_dep; + for (auto* op_dep : _write_dependencies) { + _blocked_dep = op_dep->is_blocked_by(this); + if (_blocked_dep != nullptr) { + _blocked_dep->start_watcher(); + return _blocked_dep; + } } return nullptr; } @@ -203,18 +200,13 @@ private: DataSinkOperatorXPtr _sink; std::vector<Dependency*> _read_dependencies; - Dependency* _write_dependencies; + std::vector<Dependency*> _write_dependencies; std::vector<Dependency*> _finish_dependencies; RuntimeFilterDependency* _filter_dependency; - // Write dependencies of upstream pipeline tasks. - DependencyMap _upstream_dependency; - // Read dependencies of this pipeline task. - std::map<int, DependencySPtr> _source_dependency; - // Write dependencies of this pipeline tasks. - std::vector<DependencySPtr> _downstream_dependency; // All shared states of this pipeline task. - std::map<int, std::shared_ptr<BasicSharedState>> _shared_states; + std::map<int, std::shared_ptr<BasicSharedState>> _op_shared_states; + std::shared_ptr<BasicSharedState> _sink_shared_state; std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>> _le_state_map; int _task_idx; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
