This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 2b7fa9d6bbd42fdb9eca2a21c8e0cc4e7dcec10a
Author: Gabriel <[email protected]>
AuthorDate: Thu Feb 29 09:58:27 2024 +0800

    [pipelineX](refactor) Rebuild relationship between dep and operator (#31487)
---
 be/src/pipeline/exec/analytic_source_operator.h    |   1 -
 be/src/pipeline/exec/es_scan_operator.cpp          |   2 +-
 be/src/pipeline/exec/exchange_sink_operator.cpp    |   7 --
 be/src/pipeline/exec/exchange_sink_operator.h      |  18 ++-
 be/src/pipeline/exec/exchange_source_operator.cpp  |   5 +-
 be/src/pipeline/exec/exchange_source_operator.h    |  11 +-
 be/src/pipeline/exec/file_scan_operator.cpp        |   2 +-
 be/src/pipeline/exec/multi_cast_data_stream_sink.h |  11 +-
 be/src/pipeline/exec/olap_scan_operator.cpp        |   4 +-
 be/src/pipeline/exec/scan_operator.cpp             |  28 +++--
 be/src/pipeline/exec/scan_operator.h               |   9 +-
 be/src/pipeline/exec/set_probe_sink_operator.h     |   2 +
 be/src/pipeline/exec/set_sink_operator.cpp         |   2 -
 be/src/pipeline/exec/set_sink_operator.h           |   5 +-
 be/src/pipeline/exec/set_source_operator.cpp       |   7 +-
 be/src/pipeline/exec/set_source_operator.h         |   6 +-
 be/src/pipeline/exec/union_sink_operator.h         |  13 ++
 be/src/pipeline/exec/union_source_operator.cpp     |  32 +++--
 be/src/pipeline/exec/union_source_operator.h       |   4 +
 be/src/pipeline/pipeline_x/dependency.cpp          |  29 +++--
 be/src/pipeline/pipeline_x/dependency.h            |  87 ++++++--------
 .../local_exchange_source_operator.cpp             |   1 -
 be/src/pipeline/pipeline_x/operator.cpp            | 131 ++++++---------------
 be/src/pipeline/pipeline_x/operator.h              |  35 +++---
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  15 ++-
 .../pipeline_x/pipeline_x_fragment_context.h       |   2 -
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp     |  44 +++----
 be/src/pipeline/pipeline_x/pipeline_x_task.h       |  70 +++++------
 28 files changed, 266 insertions(+), 317 deletions(-)

diff --git a/be/src/pipeline/exec/analytic_source_operator.h 
b/be/src/pipeline/exec/analytic_source_operator.h
index e98a50186e9..b2ab5b24b3c 100644
--- a/be/src/pipeline/exec/analytic_source_operator.h
+++ b/be/src/pipeline/exec/analytic_source_operator.h
@@ -76,7 +76,6 @@ private:
         if (need_more_input) {
             _dependency->block();
             _dependency->set_ready_to_write();
-            _shared_state->sink_dep->set_ready();
         } else {
             _dependency->set_block_to_write();
             _dependency->set_ready();
diff --git a/be/src/pipeline/exec/es_scan_operator.cpp 
b/be/src/pipeline/exec/es_scan_operator.cpp
index 96283ac5052..c00ee6917ea 100644
--- a/be/src/pipeline/exec/es_scan_operator.cpp
+++ b/be/src/pipeline/exec/es_scan_operator.cpp
@@ -67,7 +67,7 @@ Status EsScanLocalState::_process_conjuncts() {
 Status EsScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* 
scanners) {
     if (_scan_ranges.empty()) {
         _eos = true;
-        _dependency->set_ready();
+        _scan_dependency->set_ready();
         return Status::OK();
     }
 
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index a43cc07b92a..1220230a343 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -170,12 +170,10 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
             id, p._dest_node_id, _sender_id, _state->be_number(), state);
 
     register_channels(_sink_buffer.get());
-    auto* _exchange_sink_dependency = _dependency;
     _queue_dependency =
             Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
                                       "ExchangeSinkQueueDependency", true, 
state->get_query_ctx());
     _sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
-    _exchange_sink_dependency->add_child(_queue_dependency);
     if ((p._part_type == TPartitionType::UNPARTITIONED || channels.size() == 
1) &&
         !only_local_exchange) {
         _broadcast_dependency =
@@ -186,7 +184,6 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
         for (int i = 0; i < config::num_broadcast_buffer; ++i) {
             
_broadcast_pb_blocks->push(vectorized::BroadcastPBlockHolder::create_shared());
         }
-        _exchange_sink_dependency->add_child(_broadcast_dependency);
 
         _wait_broadcast_buffer_timer =
                 ADD_CHILD_TIMER(_profile, "WaitForBroadcastBuffer", 
timer_name);
@@ -194,19 +191,15 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
         size_t dep_id = 0;
         _local_channels_dependency.resize(local_size);
         _wait_channel_timer.resize(local_size);
-        auto deps_for_channels = AndDependency::create_shared(
-                _parent->operator_id(), _parent->node_id(), 
state->get_query_ctx());
         for (auto channel : channels) {
             if (channel->is_local()) {
                 _local_channels_dependency[dep_id] = 
channel->get_local_channel_dependency();
                 DCHECK(_local_channels_dependency[dep_id] != nullptr);
-                
deps_for_channels->add_child(_local_channels_dependency[dep_id]);
                 _wait_channel_timer[dep_id] = ADD_CHILD_TIMER(
                         _profile, fmt::format("WaitForLocalExchangeBuffer{}", 
dep_id), timer_name);
                 dep_id++;
             }
         }
-        _exchange_sink_dependency->add_child(deps_for_channels);
     }
     if (p._part_type == TPartitionType::HASH_PARTITIONED) {
         _partition_count = channels.size();
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 43919236a86..4a2f1a3dfd9 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -64,9 +64,9 @@ private:
     int _mult_cast_id = -1;
 };
 
-class ExchangeSinkLocalState final : public 
PipelineXSinkLocalState<AndSharedState> {
+class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
     ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState);
-    using Base = PipelineXSinkLocalState<AndSharedState>;
+    using Base = PipelineXSinkLocalState<>;
 
 public:
     ExchangeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
@@ -79,6 +79,16 @@ public:
                 state->get_query_ctx());
     }
 
+    std::vector<Dependency*> dependencies() const override {
+        std::vector<Dependency*> dep_vec;
+        dep_vec.push_back(_queue_dependency.get());
+        if (_broadcast_dependency) {
+            dep_vec.push_back(_broadcast_dependency.get());
+        }
+        std::for_each(_local_channels_dependency.begin(), 
_local_channels_dependency.end(),
+                      [&](std::shared_ptr<Dependency> dep) { 
dep_vec.push_back(dep.get()); });
+        return dep_vec;
+    }
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
     Status open(RuntimeState* state) override;
     Status close(RuntimeState* state, Status exec_status) override;
@@ -154,8 +164,8 @@ private:
 
     vectorized::BlockSerializer<ExchangeSinkLocalState> _serializer;
 
-    std::shared_ptr<Dependency> _queue_dependency;
-    std::shared_ptr<Dependency> _broadcast_dependency;
+    std::shared_ptr<Dependency> _queue_dependency = nullptr;
+    std::shared_ptr<Dependency> _broadcast_dependency = nullptr;
 
     /**
      * We use this to control the execution for local exchange.
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 2c519319dd0..664e576e1ce 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -74,7 +74,6 @@ Status ExchangeLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
             state, p.input_row_desc(), state->fragment_instance_id(), 
p.node_id(), p.num_senders(),
             profile(), p.is_merging());
-    auto* source_dependency = _dependency;
     const auto& queues = stream_recvr->sender_queues();
     deps.resize(queues.size());
     metrics.resize(queues.size());
@@ -82,10 +81,8 @@ Status ExchangeLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
         deps[i] = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
                                             "SHUFFLE_DATA_DEPENDENCY", 
state->get_query_ctx());
         queues[i]->set_dependency(deps[i]);
-        source_dependency->add_child(deps[i]);
     }
-    static const std::string timer_name =
-            "WaitForDependency[" + source_dependency->name() + "]Time";
+    static const std::string timer_name = "WaitForDependencyTime";
     _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, 
timer_name, 1);
     for (size_t i = 0; i < queues.size(); i++) {
         metrics[i] = ADD_CHILD_TIMER_WITH_LEVEL(_runtime_profile, 
fmt::format("WaitForData{}", i),
diff --git a/be/src/pipeline/exec/exchange_source_operator.h 
b/be/src/pipeline/exec/exchange_source_operator.h
index 1b106f2e259..6176ad5b7f7 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -51,17 +51,24 @@ public:
 };
 
 class ExchangeSourceOperatorX;
-class ExchangeLocalState final : public PipelineXLocalState<AndSharedState> {
+class ExchangeLocalState final : public PipelineXLocalState<> {
     ENABLE_FACTORY_CREATOR(ExchangeLocalState);
 
 public:
-    using Base = PipelineXLocalState<AndSharedState>;
+    using Base = PipelineXLocalState<>;
     ExchangeLocalState(RuntimeState* state, OperatorXBase* parent);
 
     Status init(RuntimeState* state, LocalStateInfo& info) override;
     Status open(RuntimeState* state) override;
     Status close(RuntimeState* state) override;
     std::string debug_string(int indentation_level) const override;
+
+    std::vector<Dependency*> dependencies() const override {
+        std::vector<Dependency*> dep_vec;
+        std::for_each(deps.begin(), deps.end(),
+                      [&](std::shared_ptr<Dependency> dep) { 
dep_vec.push_back(dep.get()); });
+        return dep_vec;
+    }
     std::shared_ptr<doris::vectorized::VDataStreamRecvr> stream_recvr;
     doris::vectorized::VSortExecExprs vsort_exec_exprs;
     int64_t num_rows_skipped;
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp 
b/be/src/pipeline/exec/file_scan_operator.cpp
index ce0c5042cff..ac193147dfb 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -33,7 +33,7 @@ namespace doris::pipeline {
 Status FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* 
scanners) {
     if (_scan_ranges.empty()) {
         _eos = true;
-        _dependency->set_ready();
+        _scan_dependency->set_ready();
         return Status::OK();
     }
 
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h 
b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
index d9a29dfa0d4..b4886f089ef 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
@@ -92,11 +92,16 @@ public:
 
     const RowDescriptor& row_desc() const override { return _row_desc; }
 
-    std::shared_ptr<MultiCastSharedState> create_multi_cast_data_streamer() {
-        auto multi_cast_data_streamer =
+    std::shared_ptr<BasicSharedState> create_shared_state() const override {
+        std::shared_ptr<BasicSharedState> ss =
                 std::make_shared<MultiCastSharedState>(_row_desc, _pool, 
_cast_sender_count);
-        return multi_cast_data_streamer;
+        ss->id = operator_id();
+        for (auto& dest : dests_id()) {
+            ss->related_op_ids.insert(dest);
+        }
+        return ss;
     }
+
     const TMultiCastDataStreamSink& sink_node() { return _sink; }
 
 private:
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp 
b/be/src/pipeline/exec/olap_scan_operator.cpp
index f443cacf040..0aab714449e 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -225,7 +225,7 @@ bool OlapScanLocalState::_storage_no_merge() {
 Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* 
scanners) {
     if (_scan_ranges.empty()) {
         _eos = true;
-        _dependency->set_ready();
+        _scan_dependency->set_ready();
         return Status::OK();
     }
     SCOPED_TIMER(_scanner_init_timer);
@@ -486,7 +486,7 @@ Status OlapScanLocalState::_build_key_ranges_and_filters() {
         }
         if (eos) {
             _eos = true;
-            _dependency->set_ready();
+            _scan_dependency->set_ready();
         }
 
         for (auto& iter : _colname_to_value_range) {
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index e3b0fe38a1d..0fb92d793a8 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -111,7 +111,12 @@ bool ScanLocalState<Derived>::should_run_serial() const {
 
 template <typename Derived>
 Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& 
info) {
-    RETURN_IF_ERROR(PipelineXLocalState<EmptySharedState>::init(state, info));
+    RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
+    _scan_dependency =
+            Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
+                                      _parent->get_name() + "_DEPENDENCY", 
state->get_query_ctx());
+    _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
+            _runtime_profile, "WaitForDependency[" + _scan_dependency->name() 
+ "]Time", 1);
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     auto& p = _parent->cast<typename Derived::Parent>();
@@ -252,7 +257,7 @@ Status ScanLocalState<Derived>::_normalize_conjuncts() {
                 [&](auto&& range) {
                     if (range.is_empty_value_range()) {
                         _eos = true;
-                        _dependency->set_ready();
+                        _scan_dependency->set_ready();
                     }
                 },
                 it.second.second);
@@ -543,8 +548,7 @@ template <typename Derived>
 std::string ScanLocalState<Derived>::debug_string(int indentation_level) const 
{
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer, "{}, _eos = {}",
-                   
PipelineXLocalState<EmptySharedState>::debug_string(indentation_level),
-                   _eos.load());
+                   PipelineXLocalState<>::debug_string(indentation_level), 
_eos.load());
     if (_scanner_ctx) {
         fmt::format_to(debug_string_buffer, "");
         fmt::format_to(debug_string_buffer, ", Scanner Context: {}", 
_scanner_ctx->debug_string());
@@ -587,7 +591,7 @@ Status 
ScanLocalState<Derived>::_eval_const_conjuncts(vectorized::VExpr* vexpr,
             if (constant_val == nullptr || 
!*reinterpret_cast<bool*>(constant_val)) {
                 *pdt = vectorized::VScanNode::PushDownType::ACCEPTABLE;
                 _eos = true;
-                _dependency->set_ready();
+                _scan_dependency->set_ready();
             }
         } else if (const vectorized::ColumnVector<vectorized::UInt8>* 
bool_column =
                            
check_and_get_column<vectorized::ColumnVector<vectorized::UInt8>>(
@@ -605,7 +609,7 @@ Status 
ScanLocalState<Derived>::_eval_const_conjuncts(vectorized::VExpr* vexpr,
                 if (constant_val == nullptr || 
!*reinterpret_cast<bool*>(constant_val)) {
                     *pdt = vectorized::VScanNode::PushDownType::ACCEPTABLE;
                     _eos = true;
-                    _dependency->set_ready();
+                    _scan_dependency->set_ready();
                 }
             } else {
                 LOG(WARNING) << "Constant predicate in scan node should return 
a bool column with "
@@ -803,7 +807,7 @@ Status 
ScanLocalState<Derived>::_normalize_not_in_and_not_eq_predicate(
         auto fn_name = std::string("");
         if (!is_fixed_range && state->null_in_set) {
             _eos = true;
-            _dependency->set_ready();
+            _scan_dependency->set_ready();
         }
         while (iter->has_next()) {
             // column not in (nullptr) is always true
@@ -1201,7 +1205,7 @@ Status ScanLocalState<Derived>::_prepare_scanners() {
     }
     if (scanners.empty()) {
         _eos = true;
-        _dependency->set_ready();
+        _scan_dependency->set_ready();
     } else {
         for (auto& scanner : scanners) {
             scanner->set_query_statistics(_query_statistics.get());
@@ -1218,7 +1222,7 @@ Status ScanLocalState<Derived>::_start_scanners(
     auto& p = _parent->cast<typename Derived::Parent>();
     _scanner_ctx = PipXScannerContext::create_shared(
             state(), this, p._output_tuple_desc, p.output_row_descriptor(), 
scanners, p.limit(),
-            state()->scan_queue_mem_limit(), _dependency->shared_from_this());
+            state()->scan_queue_mem_limit(), _scan_dependency);
     return Status::OK();
 }
 
@@ -1404,7 +1408,7 @@ Status ScanLocalState<Derived>::close(RuntimeState* 
state) {
     if (_closed) {
         return Status::OK();
     }
-    COUNTER_UPDATE(exec_time_counter(), _dependency->watcher_elapse_time());
+    COUNTER_UPDATE(exec_time_counter(), 
_scan_dependency->watcher_elapse_time());
     COUNTER_UPDATE(exec_time_counter(), 
_filter_dependency->watcher_elapse_time());
     SCOPED_TIMER(_close_timer);
 
@@ -1412,10 +1416,10 @@ Status ScanLocalState<Derived>::close(RuntimeState* 
state) {
     if (_scanner_ctx) {
         _scanner_ctx->stop_scanners(state);
     }
-    COUNTER_SET(_wait_for_dependency_timer, 
_dependency->watcher_elapse_time());
+    COUNTER_SET(_wait_for_dependency_timer, 
_scan_dependency->watcher_elapse_time());
     COUNTER_SET(_wait_for_rf_timer, _filter_dependency->watcher_elapse_time());
 
-    return PipelineXLocalState<EmptySharedState>::close(state);
+    return PipelineXLocalState<>::close(state);
 }
 
 template <typename LocalStateType>
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 0b1b2f46da7..958581b2f6f 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -57,11 +57,10 @@ public:
     std::string debug_string() const override;
 };
 
-class ScanLocalStateBase : public PipelineXLocalState<EmptySharedState>,
-                           public vectorized::RuntimeFilterConsumer {
+class ScanLocalStateBase : public PipelineXLocalState<>, public 
vectorized::RuntimeFilterConsumer {
 public:
     ScanLocalStateBase(RuntimeState* state, OperatorXBase* parent)
-            : PipelineXLocalState<EmptySharedState>(state, parent),
+            : PipelineXLocalState<>(state, parent),
               vectorized::RuntimeFilterConsumer(parent->node_id(), 
parent->runtime_filter_descs(),
                                                 parent->row_descriptor(), 
_conjuncts) {}
     virtual ~ScanLocalStateBase() = default;
@@ -97,6 +96,8 @@ protected:
 
     std::atomic<bool> _opened {false};
 
+    DependencySPtr _scan_dependency = nullptr;
+
     std::shared_ptr<RuntimeProfile> _scanner_profile;
     RuntimeProfile::Counter* _scanner_sched_counter = nullptr;
     RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr;
@@ -166,6 +167,8 @@ class ScanLocalState : public ScanLocalStateBase {
 
     RuntimeFilterDependency* filterdependency() override { return 
_filter_dependency.get(); };
 
+    std::vector<Dependency*> dependencies() const override { return 
{_scan_dependency.get()}; }
+
 protected:
     template <typename LocalStateType>
     friend class ScanOperatorX;
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h 
b/be/src/pipeline/exec/set_probe_sink_operator.h
index 2e8500cd452..6b4197ea94b 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -131,6 +131,8 @@ public:
                             : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
     }
 
+    std::shared_ptr<BasicSharedState> create_shared_state() const override { 
return nullptr; }
+
 private:
     void _finalize_probe(SetProbeSinkLocalState<is_intersect>& local_state);
     Status _extract_probe_column(SetProbeSinkLocalState<is_intersect>& 
local_state,
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp 
b/be/src/pipeline/exec/set_sink_operator.cpp
index 9080bb24504..7ef4871555d 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -198,10 +198,8 @@ Status SetSinkOperatorX<is_intersect>::init(const 
TPlanNode& tnode, RuntimeState
     // Create result_expr_ctx_lists_ from thrift exprs.
     if (tnode.node_type == TPlanNodeType::type::INTERSECT_NODE) {
         result_texpr_lists = &(tnode.intersect_node.result_expr_lists);
-        _child_quantity = tnode.intersect_node.result_expr_lists.size();
     } else if (tnode.node_type == TPlanNodeType::type::EXCEPT_NODE) {
         result_texpr_lists = &(tnode.except_node.result_expr_lists);
-        _child_quantity = tnode.except_node.result_expr_lists.size();
     } else {
         return Status::NotSupported("Not Implemented, Check The Operation 
Node.");
     }
diff --git a/be/src/pipeline/exec/set_sink_operator.h 
b/be/src/pipeline/exec/set_sink_operator.h
index 63b2b89380b..24f23593ea0 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -99,6 +99,9 @@ public:
                      const DescriptorTbl& descs)
             : Base(sink_id, tnode.node_id, tnode.node_id),
               _cur_child_id(child_id),
+              _child_quantity(tnode.node_type == 
TPlanNodeType::type::INTERSECT_NODE
+                                      ? 
tnode.intersect_node.result_expr_lists.size()
+                                      : 
tnode.except_node.result_expr_lists.size()),
               _is_colocate(is_intersect ? tnode.intersect_node.is_colocate
                                         : tnode.except_node.is_colocate),
               _partition_exprs(is_intersect ? 
tnode.intersect_node.result_expr_lists[child_id]
@@ -131,7 +134,7 @@ private:
                                  vectorized::Block& block, 
vectorized::ColumnRawPtrs& raw_ptrs);
 
     const int _cur_child_id;
-    int _child_quantity;
+    const int _child_quantity;
     // every child has its result expr list
     vectorized::VExprContextSPtrs _child_exprs;
     const bool _is_colocate;
diff --git a/be/src/pipeline/exec/set_source_operator.cpp 
b/be/src/pipeline/exec/set_source_operator.cpp
index 954ca28dc9b..15524a25a7b 100644
--- a/be/src/pipeline/exec/set_source_operator.cpp
+++ b/be/src/pipeline/exec/set_source_operator.cpp
@@ -54,11 +54,8 @@ Status SetSourceLocalState<is_intersect>::init(RuntimeState* 
state, LocalStateIn
     RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
-    auto& deps = info.upstream_dependencies;
-    _shared_state->probe_finished_children_dependency.resize(deps.size(), 
nullptr);
-    for (auto& dep : deps) {
-        dep->set_shared_state(_dependency->shared_state());
-    }
+    _shared_state->probe_finished_children_dependency.resize(
+            _parent->cast<SetSourceOperatorX<is_intersect>>()._child_quantity, 
nullptr);
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/set_source_operator.h 
b/be/src/pipeline/exec/set_source_operator.h
index d7026f015cf..1c5cf162940 100644
--- a/be/src/pipeline/exec/set_source_operator.h
+++ b/be/src/pipeline/exec/set_source_operator.h
@@ -85,7 +85,10 @@ public:
 
     SetSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int 
operator_id,
                        const DescriptorTbl& descs)
-            : Base(pool, tnode, operator_id, descs) {};
+            : Base(pool, tnode, operator_id, descs),
+              _child_quantity(tnode.node_type == 
TPlanNodeType::type::INTERSECT_NODE
+                                      ? 
tnode.intersect_node.result_expr_lists.size()
+                                      : 
tnode.except_node.result_expr_lists.size()) {};
     ~SetSourceOperatorX() override = default;
 
     [[nodiscard]] bool is_source() const override { return true; }
@@ -105,6 +108,7 @@ private:
 
     void _add_result_columns(SetSourceLocalState<is_intersect>& local_state,
                              vectorized::RowRefListWithFlags& value, int& 
block_size);
+    const int _child_quantity;
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/union_sink_operator.h 
b/be/src/pipeline/exec/union_sink_operator.h
index 5fd670f0ef7..6d79d3f2a9f 100644
--- a/be/src/pipeline/exec/union_sink_operator.h
+++ b/be/src/pipeline/exec/union_sink_operator.h
@@ -111,6 +111,19 @@ public:
 
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
 
+    std::shared_ptr<BasicSharedState> create_shared_state() const override {
+        if (_cur_child_id > 0) {
+            return nullptr;
+        } else {
+            std::shared_ptr<BasicSharedState> ss = 
std::make_shared<UnionSharedState>(_child_size);
+            ss->id = operator_id();
+            for (auto& dest : dests_id()) {
+                ss->related_op_ids.insert(dest);
+            }
+            return ss;
+        }
+    }
+
 private:
     int _get_first_materialized_child_idx() const { return 
_first_materialized_child_idx; }
 
diff --git a/be/src/pipeline/exec/union_source_operator.cpp 
b/be/src/pipeline/exec/union_source_operator.cpp
index f2f4ca82e4c..dc1de2900b9 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -111,15 +111,18 @@ Status UnionSourceLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     auto& p = _parent->cast<Parent>();
-    int child_count = p.get_child_count();
-    if (child_count != 0) {
-        auto& deps = info.upstream_dependencies;
-        for (auto& dep : deps) {
-            dep->set_shared_state(_dependency->shared_state());
-        }
+    if (p.get_child_count() != 0) {
+        ((UnionSharedState*)_dependency->shared_state())
+                
->data_queue.set_source_dependency(_shared_state->source_deps.front());
+    } else {
+        _only_const_dependency = Dependency::create_shared(
+                _parent->operator_id(), _parent->node_id(), 
_parent->get_name() + "_DEPENDENCY",
+                state->get_query_ctx());
+        _dependency = _only_const_dependency.get();
+        _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
+                _runtime_profile, "WaitForDependency[" + _dependency->name() + 
"]Time", 1);
     }
-    ((UnionSharedState*)_dependency->shared_state())
-            ->data_queue.set_source_dependency(info.dependency);
+
     // Const exprs materialized by this node. These exprs don't refer to any 
children.
     // Only materialized by the first fragment instance to avoid duplication.
     if (state->per_fragment_instance_idx() == 0) {
@@ -138,7 +141,8 @@ Status UnionSourceLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
             RETURN_IF_ERROR(clone_expr_list(_const_expr_list, 
other_expr_list));
         }
     }
-    if (child_count == 0) {
+
+    if (p.get_child_count() == 0) {
         _dependency->set_ready();
     }
     return Status::OK();
@@ -147,9 +151,11 @@ Status UnionSourceLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
 std::string UnionSourceLocalState::debug_string(int indentation_level) const {
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer, "{}", 
Base::debug_string(indentation_level));
-    fmt::format_to(debug_string_buffer, ", data_queue: (is_all_finish = {}, 
has_data = {})",
-                   _shared_state->data_queue.is_all_finish(),
-                   _shared_state->data_queue.remaining_has_data());
+    if (_shared_state) {
+        fmt::format_to(debug_string_buffer, ", data_queue: (is_all_finish = 
{}, has_data = {})",
+                       _shared_state->data_queue.is_all_finish(),
+                       _shared_state->data_queue.remaining_has_data());
+    }
     return fmt::to_string(debug_string_buffer);
 }
 
@@ -161,7 +167,7 @@ Status UnionSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* b
             RETURN_IF_ERROR(get_next_const(state, block));
         }
         local_state._need_read_for_const_expr = has_more_const(state);
-    } else {
+    } else if (_child_size != 0) {
         std::unique_ptr<vectorized::Block> output_block = 
vectorized::Block::create_unique();
         int child_idx = 0;
         
RETURN_IF_ERROR(local_state._shared_state->data_queue.get_block_from_queue(&output_block,
diff --git a/be/src/pipeline/exec/union_source_operator.h 
b/be/src/pipeline/exec/union_source_operator.h
index 69e81bcd4fc..40d02324cbd 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -87,6 +87,10 @@ private:
     bool _need_read_for_const_expr {true};
     int _const_expr_list_idx {0};
     std::vector<vectorized::VExprContextSPtrs> _const_expr_lists;
+
+    // If this operator has no children, there is no shared state which owns 
dependency. So we
+    // use this local state to hold this dependency.
+    DependencySPtr _only_const_dependency = nullptr;
 };
 
 class UnionSourceOperatorX final : public OperatorX<UnionSourceLocalState> {
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp 
b/be/src/pipeline/pipeline_x/dependency.cpp
index 56045118a94..adbadcfb835 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -29,6 +29,22 @@
 
 namespace doris::pipeline {
 
+Dependency* BasicSharedState::create_source_dependency(int operator_id, int 
node_id,
+                                                       std::string name, 
QueryContext* ctx) {
+    source_deps.push_back(
+            std::make_shared<Dependency>(operator_id, node_id, name + 
"_DEPENDENCY", ctx));
+    source_deps.back()->set_shared_state(this);
+    return source_deps.back().get();
+}
+
+Dependency* BasicSharedState::create_sink_dependency(int dest_id, int node_id, 
std::string name,
+                                                     QueryContext* ctx) {
+    sink_deps.push_back(
+            std::make_shared<Dependency>(dest_id, node_id, name + 
"_DEPENDENCY", true, ctx));
+    sink_deps.back()->set_shared_state(this);
+    return sink_deps.back().get();
+}
+
 void Dependency::_add_block_task(PipelineXTask* task) {
     DCHECK(_blocked_task.empty() || _blocked_task[_blocked_task.size() - 1] != 
task)
             << "Duplicate task: " << task->debug_string();
@@ -103,17 +119,6 @@ std::string RuntimeFilterDependency::debug_string(int 
indentation_level) {
     return fmt::to_string(debug_string_buffer);
 }
 
-std::string AndDependency::debug_string(int indentation_level) {
-    fmt::memory_buffer debug_string_buffer;
-    fmt::format_to(debug_string_buffer, "{}{}: id={}, children=[",
-                   std::string(indentation_level * 2, ' '), _name, _node_id);
-    for (auto& child : _children) {
-        fmt::format_to(debug_string_buffer, "{}, \n", 
child->debug_string(indentation_level = 1));
-    }
-    fmt::format_to(debug_string_buffer, "{}]", std::string(indentation_level * 
2, ' '));
-    return fmt::to_string(debug_string_buffer);
-}
-
 bool RuntimeFilterTimer::has_ready() {
     std::unique_lock<std::mutex> lc(_lock);
     return _is_ready;
@@ -193,7 +198,7 @@ void LocalExchangeSharedState::sub_running_sink_operators() 
{
 }
 
 LocalExchangeSharedState::LocalExchangeSharedState(int num_instances) {
-    source_dependencies.resize(num_instances, nullptr);
+    source_deps.resize(num_instances, nullptr);
     mem_trackers.resize(num_instances, nullptr);
 }
 
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 3de90fa915a..1733a8fd291 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -70,9 +70,18 @@ struct BasicSharedState {
                 << " and expect type is" << typeid(TARGET).name();
         return reinterpret_cast<const TARGET*>(this);
     }
-    DependencySPtr source_dep = nullptr;
-    DependencySPtr sink_dep = nullptr;
+    std::vector<DependencySPtr> source_deps;
+    std::vector<DependencySPtr> sink_deps;
+    int id = 0;
+    std::set<int> related_op_ids;
+
     virtual ~BasicSharedState() = default;
+
+    Dependency* create_source_dependency(int operator_id, int node_id, 
std::string name,
+                                         QueryContext* ctx);
+
+    Dependency* create_sink_dependency(int dest_id, int node_id, std::string 
name,
+                                       QueryContext* ctx);
 };
 
 class Dependency : public std::enable_shared_from_this<Dependency> {
@@ -94,22 +103,15 @@ public:
               _query_ctx(query_ctx) {}
     virtual ~Dependency() = default;
 
+    bool is_write_dependency() const { return _is_write_dependency; }
     [[nodiscard]] int id() const { return _id; }
     [[nodiscard]] virtual std::string name() const { return _name; }
-    virtual void add_child(std::shared_ptr<Dependency> child) {
-        LOG(FATAL) << "Only AndDependency could add child, it is wrong usage";
-    }
     BasicSharedState* shared_state() { return _shared_state; }
     void set_shared_state(BasicSharedState* shared_state) { _shared_state = 
shared_state; }
     virtual std::string debug_string(int indentation_level = 0);
 
     // Start the watcher. We use it to count how long this dependency block 
the current pipeline task.
-    void start_watcher() {
-        for (auto& child : _children) {
-            child->start_watcher();
-        }
-        _watcher.start();
-    }
+    void start_watcher() { _watcher.start(); }
     [[nodiscard]] int64_t watcher_elapse_time() { return 
_watcher.elapsed_time(); }
 
     // Which dependency current pipeline task is blocked by. `nullptr` if this 
dependency is ready.
@@ -118,21 +120,21 @@ public:
     void set_ready();
     void set_ready_to_read() {
         DCHECK(_is_write_dependency) << debug_string();
-        DCHECK(_shared_state->source_dep != nullptr) << debug_string();
-        _shared_state->source_dep->set_ready();
+        DCHECK(_shared_state->source_deps.size() == 1) << debug_string();
+        _shared_state->source_deps.front()->set_ready();
     }
     void set_block_to_read() {
         DCHECK(_is_write_dependency) << debug_string();
-        DCHECK(_shared_state->source_dep != nullptr) << debug_string();
-        _shared_state->source_dep->block();
+        DCHECK(_shared_state->source_deps.size() == 1) << debug_string();
+        _shared_state->source_deps.front()->block();
     }
     void set_ready_to_write() {
-        DCHECK(_shared_state->sink_dep != nullptr) << debug_string();
-        _shared_state->sink_dep->set_ready();
+        DCHECK(_shared_state->sink_deps.size() == 1) << debug_string();
+        _shared_state->sink_deps.front()->set_ready();
     }
     void set_block_to_write() {
-        DCHECK(_shared_state->sink_dep != nullptr) << debug_string();
-        _shared_state->sink_dep->block();
+        DCHECK(_shared_state->sink_deps.size() == 1) << debug_string();
+        _shared_state->sink_deps.front()->block();
     }
 
     // Notify downstream pipeline tasks this dependency is blocked.
@@ -172,7 +174,6 @@ protected:
 
     BasicSharedState* _shared_state = nullptr;
     MonotonicStopWatch _watcher;
-    std::list<std::shared_ptr<Dependency>> _children;
 
     std::mutex _task_lock;
     std::vector<PipelineXTask*> _blocked_task;
@@ -322,31 +323,6 @@ protected:
     std::shared_ptr<std::atomic_bool> _blocked_by_rf;
 };
 
-struct EmptySharedState final : public BasicSharedState {};
-
-struct AndSharedState final : public BasicSharedState {};
-
-class AndDependency final : public Dependency {
-public:
-    using SharedState = AndSharedState;
-    ENABLE_FACTORY_CREATOR(AndDependency);
-    AndDependency(int id, int node_id, QueryContext* query_ctx)
-            : Dependency(id, node_id, "AndDependency", query_ctx) {}
-
-    std::string debug_string(int indentation_level = 0) override;
-
-    void add_child(std::shared_ptr<Dependency> child) override { 
_children.push_back(child); }
-
-    [[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override {
-        for (auto& child : Dependency::_children) {
-            if (auto* dep = child->is_blocked_by(task)) {
-                return dep;
-            }
-        }
-        return nullptr;
-    }
-};
-
 struct AggSharedState : public BasicSharedState {
 public:
     AggSharedState() {
@@ -661,25 +637,28 @@ public:
     ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
     LocalExchangeSharedState(int num_instances);
     std::unique_ptr<Exchanger> exchanger {};
-    std::vector<DependencySPtr> source_dependencies;
-    DependencySPtr sink_dependency;
     std::vector<MemTracker*> mem_trackers;
     std::atomic<size_t> mem_usage = 0;
     std::mutex le_lock;
+    void create_source_dependencies(int operator_id, int node_id, 
QueryContext* ctx) {
+        for (size_t i = 0; i < source_deps.size(); i++) {
+            source_deps[i] = std::make_shared<Dependency>(
+                    operator_id, node_id, 
"LOCAL_EXCHANGE_OPERATOR_DEPENDENCY", ctx);
+            source_deps[i]->set_shared_state(this);
+        }
+    };
     void sub_running_sink_operators();
     void _set_always_ready() {
-        for (auto& dep : source_dependencies) {
+        for (auto& dep : source_deps) {
             DCHECK(dep);
             dep->set_always_ready();
         }
     }
 
-    void set_dep_by_channel_id(DependencySPtr dep, int channel_id) {
-        source_dependencies[channel_id] = dep;
-    }
+    Dependency* get_dep_by_channel_id(int channel_id) { return 
source_deps[channel_id].get(); }
 
     void set_ready_to_read(int channel_id) {
-        auto& dep = source_dependencies[channel_id];
+        auto& dep = source_deps[channel_id];
         DCHECK(dep) << channel_id;
         dep->set_ready();
     }
@@ -700,13 +679,13 @@ public:
 
     void add_total_mem_usage(size_t delta) {
         if (mem_usage.fetch_add(delta) > 
config::local_exchange_buffer_mem_limit) {
-            sink_dependency->block();
+            sink_deps.front()->block();
         }
     }
 
     void sub_total_mem_usage(size_t delta) {
         if (mem_usage.fetch_sub(delta) <= 
config::local_exchange_buffer_mem_limit) {
-            sink_dependency->set_ready();
+            sink_deps.front()->set_ready();
         }
     }
 };
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
index 568871835c6..71a5a6b3c13 100644
--- 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
+++ 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
@@ -26,7 +26,6 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* 
state, LocalStateInfo&
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     _channel_id = info.task_idx;
-    _shared_state->set_dep_by_channel_id(info.dependency, _channel_id);
     _shared_state->mem_trackers[_channel_id] = _mem_tracker.get();
     _exchanger = _shared_state->exchanger.get();
     DCHECK(_exchanger != nullptr);
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index 433e4b48654..30ff16dde80 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -267,80 +267,24 @@ Status 
DataSinkOperatorX<LocalStateType>::setup_local_state(RuntimeState* state,
     return Status::OK();
 }
 
-template <typename SharedStateType>
-constexpr bool NeedToCreate = true;
-template <>
-inline constexpr bool NeedToCreate<MultiCastSharedState> = false;
-template <>
-inline constexpr bool NeedToCreate<SetSharedState> = false;
-template <>
-inline constexpr bool NeedToCreate<UnionSharedState> = false;
-template <>
-inline constexpr bool NeedToCreate<LocalExchangeSharedState> = false;
-
 template <typename LocalStateType>
-void DataSinkOperatorX<LocalStateType>::get_dependency(
-        vector<DependencySPtr>& dependency,
-        std::map<int, std::shared_ptr<BasicSharedState>>& shared_states, 
QueryContext* ctx) {
-    std::shared_ptr<BasicSharedState> ss = nullptr;
-    if constexpr (NeedToCreate<typename LocalStateType::SharedStateType>) {
-        ss.reset(new typename LocalStateType::SharedStateType());
-        DCHECK(!shared_states.contains(dests_id().front()));
-        if constexpr (!std::is_same_v<typename 
LocalStateType::SharedStateType, FakeSharedState>) {
-            shared_states.insert({dests_id().front(), ss});
-        }
+std::shared_ptr<BasicSharedState> 
DataSinkOperatorX<LocalStateType>::create_shared_state() const {
+    if constexpr (std::is_same_v<typename LocalStateType::SharedStateType,
+                                 LocalExchangeSharedState>) {
+        return nullptr;
     } else if constexpr (std::is_same_v<typename 
LocalStateType::SharedStateType,
                                         MultiCastSharedState>) {
-        ss = 
((MultiCastDataStreamSinkOperatorX*)this)->create_multi_cast_data_streamer();
-        auto& dests = dests_id();
-        for (auto& dest_id : dests) {
-            DCHECK(!shared_states.contains(dest_id));
-            shared_states.insert({dest_id, ss});
-        }
-    }
-    if constexpr (std::is_same_v<typename LocalStateType::SharedStateType, 
AndSharedState>) {
-        auto& dests = dests_id();
-        for (auto& dest_id : dests) {
-            dependency.push_back(std::make_shared<AndDependency>(dest_id, 
_node_id, ctx));
-            dependency.back()->set_shared_state(ss.get());
-        }
-    } else if constexpr (!std::is_same_v<typename 
LocalStateType::SharedStateType,
-                                         FakeSharedState>) {
-        auto& dests = dests_id();
-        for (auto& dest_id : dests) {
-            dependency.push_back(std::make_shared<Dependency>(dest_id, 
_node_id,
-                                                              _name + 
"_DEPENDENCY", true, ctx));
-            dependency.back()->set_shared_state(ss.get());
-        }
+        LOG(FATAL) << "should not reach here!";
+        return nullptr;
     } else {
-        dependency.push_back(nullptr);
-    }
-}
-
-template <typename LocalStateType>
-DependencySPtr OperatorX<LocalStateType>::get_dependency(
-        QueryContext* ctx, std::map<int, std::shared_ptr<BasicSharedState>>& 
shared_states) {
-    std::shared_ptr<BasicSharedState> ss = nullptr;
-    if constexpr (std::is_same_v<typename LocalStateType::SharedStateType, 
SetSharedState>) {
+        std::shared_ptr<BasicSharedState> ss = nullptr;
         ss.reset(new typename LocalStateType::SharedStateType());
-        shared_states.insert({operator_id(), ss});
-    } else if constexpr (std::is_same_v<typename 
LocalStateType::SharedStateType,
-                                        UnionSharedState>) {
-        ss.reset(new typename LocalStateType::SharedStateType(
-                ((UnionSourceOperatorX*)this)->get_child_count()));
-        shared_states.insert({operator_id(), ss});
-    }
-    DependencySPtr dep = nullptr;
-    if constexpr (std::is_same_v<typename LocalStateType::SharedStateType, 
AndSharedState>) {
-        dep = std::make_shared<AndDependency>(_operator_id, _node_id, ctx);
-    } else if constexpr (std::is_same_v<typename 
LocalStateType::SharedStateType,
-                                        FakeSharedState>) {
-        dep = std::make_shared<FakeDependency>(_operator_id, _node_id, ctx);
-    } else {
-        dep = std::make_shared<Dependency>(_operator_id, _node_id, _op_name + 
"_DEPENDENCY", ctx);
-        dep->set_shared_state(ss.get());
+        ss->id = operator_id();
+        for (auto& dest : dests_id()) {
+            ss->related_op_ids.insert(dest);
+        }
+        return ss;
     }
-    return dep;
 }
 
 template <typename LocalStateType>
@@ -373,25 +317,22 @@ Status 
PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
     _runtime_profile->set_is_sink(false);
     info.parent_profile->add_child(_runtime_profile.get(), true, nullptr);
     constexpr auto is_fake_shared = std::is_same_v<SharedStateArg, 
FakeSharedState>;
-    _dependency = info.dependency.get();
     if constexpr (!is_fake_shared) {
-        _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
-                _runtime_profile, "WaitForDependency[" + _dependency->name() + 
"]Time", 1);
-        auto& deps = info.upstream_dependencies;
         if constexpr (std::is_same_v<LocalExchangeSharedState, 
SharedStateArg>) {
-            
_dependency->set_shared_state(info.le_state_map[_parent->operator_id()].first.get());
-            _shared_state = _dependency->shared_state()->template 
cast<SharedStateArg>();
-
-            _shared_state->source_dep = info.dependency;
-        } else if constexpr (!std::is_same_v<SharedStateArg, EmptySharedState> 
&&
-                             !std::is_same_v<SharedStateArg, AndSharedState>) {
-            _dependency->set_shared_state(info.shared_state);
-            _shared_state = _dependency->shared_state()->template 
cast<SharedStateArg>();
-
-            _shared_state->source_dep = info.dependency;
-            if (!deps.empty()) {
-                _shared_state->sink_dep = deps.front();
-            }
+            _shared_state = 
info.le_state_map[_parent->operator_id()].first.get();
+
+            _dependency = _shared_state->get_dep_by_channel_id(info.task_idx);
+            _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
+                    _runtime_profile, "WaitForDependency[" + 
_dependency->name() + "]Time", 1);
+        } else if (info.shared_state) {
+            // For UnionSourceOperator without children, there is no shared 
state.
+            _shared_state = info.shared_state->template cast<SharedStateArg>();
+
+            _dependency = _shared_state->create_source_dependency(
+                    _parent->operator_id(), _parent->node_id(), 
_parent->get_name(),
+                    state->get_query_ctx());
+            _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
+                    _runtime_profile, "WaitForDependency[" + 
_dependency->name() + "]Time", 1);
         }
     }
 
@@ -445,20 +386,19 @@ Status 
PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink
     _wait_for_finish_dependency_timer = ADD_TIMER(_profile, 
"PendingFinishDependency");
     constexpr auto is_fake_shared = std::is_same_v<SharedState, 
FakeSharedState>;
     if constexpr (!is_fake_shared) {
-        auto& deps = info.dependencies;
-        _dependency = deps.front().get();
         if constexpr (std::is_same_v<LocalExchangeSharedState, SharedState>) {
             _dependency = 
info.le_state_map[_parent->dests_id().front()].second.get();
-        }
-        if (_dependency) {
             _shared_state = (SharedState*)_dependency->shared_state();
-            _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
-                    _profile, "WaitForDependency[" + _dependency->name() + 
"]Time", 1);
+        } else {
+            _shared_state = info.shared_state->template cast<SharedState>();
+            _dependency = _shared_state->create_sink_dependency(
+                    _parent->dests_id().front(), _parent->node_id(), 
_parent->get_name(),
+                    state->get_query_ctx());
         }
+        _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
+                _profile, "WaitForDependency[" + _dependency->name() + 
"]Time", 1);
     } else {
-        auto& deps = info.dependencies;
-        deps.front() = std::make_shared<FakeDependency>(0, 0, 
state->get_query_ctx());
-        _dependency = deps.front().get();
+        _dependency = nullptr;
     }
     _rows_input_counter = ADD_COUNTER_WITH_LEVEL(_profile, "InputRows", 
TUnit::UNIT, 1);
     _open_timer = ADD_TIMER_WITH_LEVEL(_profile, "OpenTime", 1);
@@ -656,7 +596,6 @@ template class PipelineXSinkLocalState<UnionSharedState>;
 template class PipelineXSinkLocalState<PartitionSortNodeSharedState>;
 template class PipelineXSinkLocalState<MultiCastSharedState>;
 template class PipelineXSinkLocalState<SetSharedState>;
-template class PipelineXSinkLocalState<AndSharedState>;
 template class PipelineXSinkLocalState<LocalExchangeSharedState>;
 template class PipelineXSinkLocalState<BasicSharedState>;
 
@@ -671,8 +610,6 @@ template class PipelineXLocalState<MultiCastSharedState>;
 template class PipelineXLocalState<PartitionSortNodeSharedState>;
 template class PipelineXLocalState<SetSharedState>;
 template class PipelineXLocalState<LocalExchangeSharedState>;
-template class PipelineXLocalState<EmptySharedState>;
-template class PipelineXLocalState<AndSharedState>;
 template class PipelineXLocalState<BasicSharedState>;
 
 template class AsyncWriterSink<doris::vectorized::VFileResultWriter, 
ResultFileSinkOperatorX>;
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index 5d1268e50c9..b9c02a935a6 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -31,13 +31,10 @@ namespace doris::pipeline {
 struct LocalStateInfo {
     RuntimeProfile* parent_profile = nullptr;
     const std::vector<TScanRangeParams> scan_ranges;
-    std::vector<DependencySPtr>& upstream_dependencies;
     BasicSharedState* shared_state;
     std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, 
std::shared_ptr<Dependency>>>
             le_state_map;
     const int task_idx;
-
-    DependencySPtr dependency;
 };
 
 // This struct is used only for initializing local sink state.
@@ -45,7 +42,7 @@ struct LocalSinkStateInfo {
     const int task_idx;
     RuntimeProfile* parent_profile = nullptr;
     const int sender_id;
-    std::vector<DependencySPtr>& dependencies;
+    BasicSharedState* shared_state;
     std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, 
std::shared_ptr<Dependency>>>
             le_state_map;
     const TDataSink& tsink;
@@ -100,7 +97,7 @@ public:
 
     [[nodiscard]] virtual std::string debug_string(int indentation_level = 0) 
const = 0;
 
-    virtual Dependency* dependency() { return nullptr; }
+    virtual std::vector<Dependency*> dependencies() const { return {nullptr}; }
 
     // override in Scan
     virtual Dependency* finishdependency() { return nullptr; }
@@ -184,8 +181,6 @@ public:
         throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, _op_name);
     }
     [[nodiscard]] std::string get_name() const override { return _op_name; }
-    [[nodiscard]] virtual DependencySPtr get_dependency(
-            QueryContext* ctx, std::map<int, 
std::shared_ptr<BasicSharedState>>& shared_states) = 0;
     [[nodiscard]] virtual DataDistribution required_data_distribution() const {
         return _child_x && _child_x->ignore_data_distribution() && !is_source()
                        ? DataDistribution(ExchangeType::PASSTHROUGH)
@@ -348,10 +343,6 @@ public:
     [[nodiscard]] LocalState& get_local_state(RuntimeState* state) const {
         return state->get_local_state(operator_id())->template 
cast<LocalState>();
     }
-
-    DependencySPtr get_dependency(
-            QueryContext* ctx,
-            std::map<int, std::shared_ptr<BasicSharedState>>& shared_states) 
override;
 };
 
 template <typename SharedStateArg = FakeSharedState>
@@ -372,7 +363,9 @@ public:
 
     [[nodiscard]] std::string debug_string(int indentation_level = 0) const 
override;
 
-    Dependency* dependency() override { return _dependency; }
+    std::vector<Dependency*> dependencies() const override {
+        return _dependency ? std::vector<Dependency*> {_dependency} : 
std::vector<Dependency*> {};
+    }
 
 protected:
     Dependency* _dependency = nullptr;
@@ -422,7 +415,7 @@ public:
 
     RuntimeProfile::Counter* rows_input_counter() { return 
_rows_input_counter; }
     RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; }
-    virtual Dependency* dependency() { return nullptr; }
+    virtual std::vector<Dependency*> dependencies() const { return {nullptr}; }
 
     // override in exchange sink , AsyncWriterSink
     virtual Dependency* finishdependency() { return nullptr; }
@@ -513,9 +506,7 @@ public:
         return reinterpret_cast<const TARGET&>(*this);
     }
 
-    virtual void get_dependency(std::vector<DependencySPtr>& dependency,
-                                std::map<int, 
std::shared_ptr<BasicSharedState>>& shared_states,
-                                QueryContext* ctx) = 0;
+    [[nodiscard]] virtual std::shared_ptr<BasicSharedState> 
create_shared_state() const = 0;
     [[nodiscard]] virtual DataDistribution required_data_distribution() const {
         return _child_x && _child_x->ignore_data_distribution()
                        ? DataDistribution(ExchangeType::PASSTHROUGH)
@@ -612,9 +603,7 @@ public:
     ~DataSinkOperatorX() override = default;
 
     Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) 
override;
-    void get_dependency(std::vector<DependencySPtr>& dependency,
-                        std::map<int, std::shared_ptr<BasicSharedState>>& 
shared_states,
-                        QueryContext* ctx) override;
+    std::shared_ptr<BasicSharedState> create_shared_state() const override;
 
     using LocalState = LocalStateType;
     [[nodiscard]] LocalState& get_local_state(RuntimeState* state) const {
@@ -640,7 +629,9 @@ public:
 
     virtual std::string name_suffix() { return " (id=" + 
std::to_string(_parent->node_id()) + ")"; }
 
-    Dependency* dependency() override { return _dependency; }
+    std::vector<Dependency*> dependencies() const override {
+        return _dependency ? std::vector<Dependency*> {_dependency} : 
std::vector<Dependency*> {};
+    }
 
 protected:
     Dependency* _dependency = nullptr;
@@ -717,7 +708,9 @@ public:
 
     Status sink(RuntimeState* state, vectorized::Block* block, bool eos);
 
-    Dependency* dependency() override { return _async_writer_dependency.get(); 
}
+    std::vector<Dependency*> dependencies() const override {
+        return {_async_writer_dependency.get()};
+    }
     Status close(RuntimeState* state, Status exec_status) override;
 
     Dependency* finishdependency() override { return _finish_dependency.get(); 
}
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index f13cf37b1fb..0d1d7784f88 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -610,9 +610,13 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
                     auto& deps = _dag[_pipeline->id()];
                     for (auto& dep : deps) {
                         if (pipeline_id_to_task.contains(dep)) {
-                            task->add_upstream_dependency(
-                                    
pipeline_id_to_task[dep]->get_downstream_dependency(),
-                                    
pipeline_id_to_task[dep]->get_shared_states());
+                            auto ss = 
pipeline_id_to_task[dep]->get_sink_shared_state();
+                            if (ss) {
+                                task->inject_shared_state(ss);
+                            } else {
+                                pipeline_id_to_task[dep]->inject_shared_state(
+                                        task->get_source_shared_state());
+                            }
                         }
                     }
                 }
@@ -781,7 +785,7 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
                                                  
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true,
                                                  
_runtime_state->get_query_ctx());
     sink_dep->set_shared_state(shared_state.get());
-    shared_state->sink_dependency = sink_dep;
+    shared_state->sink_deps.push_back(sink_dep);
     _op_id_to_le_state.insert({local_exchange_id, {shared_state, sink_dep}});
 
     // 3. Set two pipelines' operator list. For example, split pipeline [Scan 
- AggSink] to
@@ -804,6 +808,9 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
     }
     operator_xs.insert(operator_xs.begin(), source_op);
 
+    shared_state->create_source_dependencies(source_op->operator_id(), 
source_op->node_id(),
+                                             _query_ctx.get());
+
     // 5. Set children for two pipelines separately.
     std::vector<std::shared_ptr<Pipeline>> new_children;
     std::vector<PipelineId> edges_with_source;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 9630484f443..1cbf4e4940e 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -191,8 +191,6 @@ private:
 #pragma clang diagnostic pop
 #endif
 
-    std::atomic_bool _canceled = false;
-
     // `_dag` manage dependencies between pipelines by pipeline ID. the 
indices will be blocked by members
     std::map<PipelineId, std::vector<PipelineId>> _dag;
 
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 7c0e81c994f..0459f1e3a0e 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -60,10 +60,10 @@ PipelineXTask::PipelineXTask(
           _task_idx(task_idx),
           _execution_dep(state->get_query_ctx()->get_execution_dependency()) {
     _pipeline_task_watcher.start();
-    _sink->get_dependency(_downstream_dependency, _shared_states, 
state->get_query_ctx());
-    for (auto& op : _operators) {
-        _source_dependency.insert(
-                {op->operator_id(), op->get_dependency(state->get_query_ctx(), 
_shared_states)});
+
+    auto shared_state = _sink->create_shared_state();
+    if (shared_state) {
+        _sink_shared_state = shared_state;
     }
     pipeline->incr_created_tasks();
 }
@@ -82,7 +82,7 @@ Status PipelineXTask::prepare(const TPipelineInstanceParams& 
local_params, const
         LocalSinkStateInfo info {_task_idx,
                                  _task_profile.get(),
                                  local_params.sender_id,
-                                 get_downstream_dependency(),
+                                 get_sink_shared_state().get(),
                                  _le_state_map,
                                  tsink};
         RETURN_IF_ERROR(_sink->setup_local_state(_state, info));
@@ -97,14 +97,8 @@ Status PipelineXTask::prepare(const TPipelineInstanceParams& 
local_params, const
 
     for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
         auto& op = _operators[op_idx];
-        auto& deps = get_upstream_dependency(op->operator_id());
-        LocalStateInfo info {parent_profile,
-                             scan_ranges,
-                             deps,
-                             get_shared_state(op->operator_id()),
-                             _le_state_map,
-                             _task_idx,
-                             _source_dependency[op->operator_id()]};
+        LocalStateInfo info {parent_profile, scan_ranges, 
get_op_shared_state(op->operator_id()),
+                             _le_state_map, _task_idx};
         RETURN_IF_ERROR(op->setup_local_state(_state, info));
         parent_profile = _state->get_local_state(op->operator_id())->profile();
         query_ctx->register_query_statistics(
@@ -126,9 +120,9 @@ Status PipelineXTask::_extract_dependencies() {
             return result.error();
         }
         auto* local_state = result.value();
-        auto* dep = local_state->dependency();
-        DCHECK(dep != nullptr);
-        _read_dependencies.push_back(dep);
+        const auto& deps = local_state->dependencies();
+        std::copy(deps.begin(), deps.end(),
+                  std::inserter(_read_dependencies, _read_dependencies.end()));
         auto* fin_dep = local_state->finishdependency();
         if (fin_dep) {
             _finish_dependencies.push_back(fin_dep);
@@ -136,9 +130,9 @@ Status PipelineXTask::_extract_dependencies() {
     }
     {
         auto* local_state = _state->get_sink_local_state();
-        auto* dep = local_state->dependency();
-        DCHECK(dep != nullptr);
-        _write_dependencies = dep;
+        _write_dependencies = local_state->dependencies();
+        DCHECK(std::all_of(_write_dependencies.begin(), 
_write_dependencies.end(),
+                           [](auto* dep) { return dep->is_write_dependency(); 
}));
         auto* fin_dep = local_state->finishdependency();
         if (fin_dep) {
             _finish_dependencies.push_back(fin_dep);
@@ -302,10 +296,8 @@ void PipelineXTask::finalize() {
     PipelineTask::finalize();
     std::unique_lock<std::mutex> lc(_release_lock);
     _finished = true;
-    std::vector<DependencySPtr> {}.swap(_downstream_dependency);
-    _upstream_dependency.clear();
-    _source_dependency.clear();
-    _shared_states.clear();
+    _sink_shared_state.reset();
+    _op_shared_states.clear();
     _le_state_map.clear();
 }
 
@@ -372,8 +364,10 @@ std::string PipelineXTask::debug_string() {
     }
 
     fmt::format_to(debug_string_buffer, "Write Dependency Information: \n");
-    fmt::format_to(debug_string_buffer, "{}. {}\n", i, 
_write_dependencies->debug_string(1));
-    i++;
+    for (size_t j = 0; j < _write_dependencies.size(); j++, i++) {
+        fmt::format_to(debug_string_buffer, "{}. {}\n", i,
+                       _write_dependencies[j]->debug_string(i + 1));
+    }
 
     if (_filter_dependency) {
         fmt::format_to(debug_string_buffer, "Runtime Filter Dependency 
Information: \n");
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h 
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 6e8a3773751..8707de4b54c 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -94,41 +94,36 @@ public:
 
     bool is_pending_finish() override { return _finish_blocked_dependency() != 
nullptr; }
 
-    std::vector<DependencySPtr>& get_downstream_dependency() { return 
_downstream_dependency; }
-    std::map<int, std::shared_ptr<BasicSharedState>>& get_shared_states() { 
return _shared_states; }
-
-    void add_upstream_dependency(std::vector<DependencySPtr>& 
multi_upstream_dependency,
-                                 std::map<int, 
std::shared_ptr<BasicSharedState>>& shared_states) {
-        for (auto dep : multi_upstream_dependency) {
-            int dst_id = dep->id();
-            if (!_upstream_dependency.contains(dst_id)) {
-                _upstream_dependency.insert({dst_id, {dep}});
-            } else {
-                _upstream_dependency[dst_id].push_back(dep);
-            }
+    std::shared_ptr<BasicSharedState> get_source_shared_state() {
+        return _op_shared_states.contains(_source->operator_id())
+                       ? _op_shared_states[_source->operator_id()]
+                       : nullptr;
+    }
 
-            if (shared_states.contains(dst_id) && 
!_shared_states.contains(dst_id)) {
-                // Shared state is created by upstream task's sink operator 
and shared by source operator of this task.
-                _shared_states.insert({dst_id, shared_states[dst_id]});
-            } else if (_shared_states.contains(dst_id) && 
!shared_states.contains(dst_id)) {
-                // Shared state is created by this task's source operator and 
shared by upstream task's sink operator.
-                shared_states.insert({dst_id, _shared_states[dst_id]});
+    void inject_shared_state(std::shared_ptr<BasicSharedState> shared_state) {
+        if (!shared_state) {
+            return;
+        }
+        // Shared state is created by upstream task's sink operator and shared 
by source operator of this task.
+        for (auto& op : _operators) {
+            if (shared_state->related_op_ids.contains(op->operator_id())) {
+                _op_shared_states.insert({op->operator_id(), shared_state});
+                return;
             }
         }
-    }
-
-    std::vector<DependencySPtr>& get_upstream_dependency(int id) {
-        if (_upstream_dependency.find(id) == _upstream_dependency.end()) {
-            _upstream_dependency.insert({id, {}});
+        if (shared_state->related_op_ids.contains(_sink->dests_id().front())) {
+            DCHECK(_sink_shared_state == nullptr);
+            _sink_shared_state = shared_state;
         }
-        return _upstream_dependency[id];
     }
 
-    BasicSharedState* get_shared_state(int id) {
-        if (!_shared_states.contains(id)) {
+    std::shared_ptr<BasicSharedState> get_sink_shared_state() { return 
_sink_shared_state; }
+
+    BasicSharedState* get_op_shared_state(int id) {
+        if (!_op_shared_states.contains(id)) {
             return nullptr;
         }
-        return _shared_states[id].get();
+        return _op_shared_states[id].get();
     }
 
     bool is_pipelineX() const override { return true; }
@@ -161,10 +156,12 @@ public:
 
 private:
     Dependency* _write_blocked_dependency() {
-        _blocked_dep = _write_dependencies->is_blocked_by(this);
-        if (_blocked_dep != nullptr) {
-            static_cast<Dependency*>(_blocked_dep)->start_watcher();
-            return _blocked_dep;
+        for (auto* op_dep : _write_dependencies) {
+            _blocked_dep = op_dep->is_blocked_by(this);
+            if (_blocked_dep != nullptr) {
+                _blocked_dep->start_watcher();
+                return _blocked_dep;
+            }
         }
         return nullptr;
     }
@@ -203,18 +200,13 @@ private:
     DataSinkOperatorXPtr _sink;
 
     std::vector<Dependency*> _read_dependencies;
-    Dependency* _write_dependencies;
+    std::vector<Dependency*> _write_dependencies;
     std::vector<Dependency*> _finish_dependencies;
     RuntimeFilterDependency* _filter_dependency;
 
-    // Write dependencies of upstream pipeline tasks.
-    DependencyMap _upstream_dependency;
-    // Read dependencies of this pipeline task.
-    std::map<int, DependencySPtr> _source_dependency;
-    // Write dependencies of this pipeline tasks.
-    std::vector<DependencySPtr> _downstream_dependency;
     // All shared states of this pipeline task.
-    std::map<int, std::shared_ptr<BasicSharedState>> _shared_states;
+    std::map<int, std::shared_ptr<BasicSharedState>> _op_shared_states;
+    std::shared_ptr<BasicSharedState> _sink_shared_state;
     std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, 
std::shared_ptr<Dependency>>>
             _le_state_map;
     int _task_idx;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to