This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fe184e322a9 [code](pipelineX) refine some pipelineX code  (#28570)
fe184e322a9 is described below

commit fe184e322a90e680ebbbb951f0ba5a8299407c1a
Author: Mryange <[email protected]>
AuthorDate: Wed Dec 20 11:45:06 2023 +0800

    [code](pipelineX) refine some pipelineX code  (#28570)
---
 be/src/pipeline/exec/exchange_sink_operator.cpp    | 14 +++--
 be/src/pipeline/exec/exchange_sink_operator.h      | 16 ++++--
 be/src/pipeline/exec/exchange_source_operator.cpp  | 14 +++--
 be/src/pipeline/exec/exchange_source_operator.h    |  7 +--
 .../exec/multi_cast_data_stream_source.cpp         | 10 ++--
 .../pipeline/exec/multi_cast_data_stream_source.h  |  3 ++
 be/src/pipeline/exec/scan_operator.cpp             | 14 ++++-
 be/src/pipeline/exec/scan_operator.h               |  9 ++++
 be/src/pipeline/pipeline_x/operator.cpp            | 60 ++++++++++++----------
 be/src/pipeline/pipeline_x/operator.h              | 23 ++++++---
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  7 +--
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp     | 31 ++++++-----
 be/src/pipeline/pipeline_x/pipeline_x_task.h       |  3 +-
 be/src/runtime/fragment_mgr.cpp                    |  1 +
 14 files changed, 130 insertions(+), 82 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index e70be36b774..0dd1dda415b 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -100,7 +100,7 @@ bool ExchangeSinkLocalState::transfer_large_data_by_brpc() 
const {
 }
 
 Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& 
info) {
-    RETURN_IF_ERROR(PipelineXSinkLocalState<>::init(state, info));
+    RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     _sender_id = info.sender_id;
@@ -174,9 +174,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
             id, p._dest_node_id, _sender_id, _state->be_number(), 
state->get_query_ctx());
 
     register_channels(_sink_buffer.get());
-
-    _exchange_sink_dependency = AndDependency::create_shared(
-            _parent->operator_id(), _parent->node_id(), 
state->get_query_ctx());
+    auto* _exchange_sink_dependency = _dependency;
     _queue_dependency = ExchangeSinkQueueDependency::create_shared(
             _parent->operator_id(), _parent->node_id(), 
state->get_query_ctx());
     _sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
@@ -237,7 +235,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
 }
 
 Status ExchangeSinkLocalState::open(RuntimeState* state) {
-    RETURN_IF_ERROR(PipelineXSinkLocalState<>::open(state));
+    RETURN_IF_ERROR(Base::open(state));
     auto& p = _parent->cast<ExchangeSinkOperatorX>();
     if (p._part_type == TPartitionType::HASH_PARTITIONED ||
         p._part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
@@ -522,8 +520,7 @@ Status ExchangeSinkOperatorX::try_close(RuntimeState* 
state, Status exec_status)
 
 std::string ExchangeSinkLocalState::debug_string(int indentation_level) const {
     fmt::memory_buffer debug_string_buffer;
-    fmt::format_to(debug_string_buffer, "{}",
-                   PipelineXSinkLocalState<>::debug_string(indentation_level));
+    fmt::format_to(debug_string_buffer, "{}", 
Base::debug_string(indentation_level));
     fmt::format_to(debug_string_buffer, ", Sink Buffer: (_should_stop = {}, 
_busy_channels = {})",
                    _sink_buffer->_should_stop.load(), 
_sink_buffer->_busy_channels.load());
     return fmt::to_string(debug_string_buffer);
@@ -536,6 +533,7 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, 
Status exec_status) {
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_close_timer);
     COUNTER_UPDATE(_wait_queue_timer, 
_queue_dependency->watcher_elapse_time());
+    COUNTER_SET(_wait_for_finish_dependency_timer, 
_finish_dependency->watcher_elapse_time());
     if (_broadcast_dependency) {
         COUNTER_UPDATE(_wait_broadcast_buffer_timer, 
_broadcast_dependency->watcher_elapse_time());
     }
@@ -545,7 +543,7 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, 
Status exec_status) {
     }
     _sink_buffer->update_profile(profile());
     _sink_buffer->close();
-    return PipelineXSinkLocalState<>::close(state, exec_status);
+    return Base::close(state, exec_status);
 }
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index bc91e5dc19d..766f43dc80e 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -144,20 +144,25 @@ public:
     // TODO(gabriel): blocked by memory
 };
 
-class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
+class ExchangeSinkLocalState final : public 
PipelineXSinkLocalState<AndDependency> {
     ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState);
+    using Base = PipelineXSinkLocalState<AndDependency>;
 
 public:
     ExchangeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
-            : PipelineXSinkLocalState<>(parent, state),
+            : Base(parent, state),
               current_channel_idx(0),
               only_local_exchange(false),
-              _serializer(this) {}
+              _serializer(this) {
+        _finish_dependency = std::make_shared<FinishDependency>(
+                parent->operator_id(), parent->node_id(), parent->get_name() + 
"_FINISH_DEPENDENCY",
+                state->get_query_ctx());
+    }
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
     Status open(RuntimeState* state) override;
     Status close(RuntimeState* state, Status exec_status) override;
-    Dependency* dependency() override { return 
_exchange_sink_dependency.get(); }
+    Dependency* finishdependency() override { return _finish_dependency.get(); 
}
     Status serialize_block(vectorized::Block* src, PBlock* dest, int 
num_receivers = 1);
     void 
register_channels(pipeline::ExchangeSinkBuffer<ExchangeSinkLocalState>* buffer);
     Status get_next_available_buffer(vectorized::BroadcastPBlockHolder** 
holder);
@@ -231,11 +236,12 @@ private:
     vectorized::BlockSerializer<ExchangeSinkLocalState> _serializer;
 
     std::shared_ptr<ExchangeSinkQueueDependency> _queue_dependency;
-    std::shared_ptr<AndDependency> _exchange_sink_dependency;
     std::shared_ptr<BroadcastDependency> _broadcast_dependency;
     std::vector<std::shared_ptr<LocalExchangeChannelDependency>> 
_local_channels_dependency;
     std::unique_ptr<vectorized::PartitionerBase> _partitioner;
     int _partition_count;
+
+    std::shared_ptr<Dependency> _finish_dependency;
 };
 
 class ExchangeSinkOperatorX final : public 
DataSinkOperatorX<ExchangeSinkLocalState> {
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 41e57fbde79..99c9e8e9fc2 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -41,12 +41,11 @@ bool ExchangeSourceOperator::is_pending_finish() const {
 }
 
 ExchangeLocalState::ExchangeLocalState(RuntimeState* state, OperatorXBase* 
parent)
-        : PipelineXLocalState<>(state, parent), num_rows_skipped(0), 
is_ready(false) {}
+        : Base(state, parent), num_rows_skipped(0), is_ready(false) {}
 
 std::string ExchangeLocalState::debug_string(int indentation_level) const {
     fmt::memory_buffer debug_string_buffer;
-    fmt::format_to(debug_string_buffer, "{}",
-                   PipelineXLocalState<>::debug_string(indentation_level));
+    fmt::format_to(debug_string_buffer, "{}", 
Base::debug_string(indentation_level));
     fmt::format_to(debug_string_buffer, ", Queues: (");
     const auto& queues = stream_recvr->sender_queues();
     for (size_t i = 0; i < queues.size(); i++) {
@@ -68,15 +67,14 @@ std::string ExchangeSourceOperatorX::debug_string(int 
indentation_level) const {
 }
 
 Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
-    RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
+    RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     auto& p = _parent->cast<ExchangeSourceOperatorX>();
     stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
             state, p.input_row_desc(), state->fragment_instance_id(), 
p.node_id(), p.num_senders(),
             profile(), p.is_merging(), p.sub_plan_query_statistics_recvr());
-    source_dependency = AndDependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
-                                                     state->get_query_ctx());
+    auto* source_dependency = _dependency;
     const auto& queues = stream_recvr->sender_queues();
     deps.resize(queues.size());
     metrics.resize(queues.size());
@@ -101,7 +99,7 @@ Status ExchangeLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
 Status ExchangeLocalState::open(RuntimeState* state) {
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
-    RETURN_IF_ERROR(PipelineXLocalState<>::open(state));
+    RETURN_IF_ERROR(Base::open(state));
     return Status::OK();
 }
 
@@ -215,7 +213,7 @@ Status ExchangeLocalState::close(RuntimeState* state) {
     if (_parent->cast<ExchangeSourceOperatorX>()._is_merging) {
         vsort_exec_exprs.close(state);
     }
-    return PipelineXLocalState<>::close(state);
+    return Base::close(state);
 }
 
 Status ExchangeSourceOperatorX::close(RuntimeState* state) {
diff --git a/be/src/pipeline/exec/exchange_source_operator.h 
b/be/src/pipeline/exec/exchange_source_operator.h
index 49211321886..a9848be23c7 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -71,21 +71,22 @@ private:
 };
 
 class ExchangeSourceOperatorX;
-class ExchangeLocalState final : public PipelineXLocalState<> {
+class ExchangeLocalState final : public PipelineXLocalState<AndDependency> {
     ENABLE_FACTORY_CREATOR(ExchangeLocalState);
+
+public:
+    using Base = PipelineXLocalState<AndDependency>;
     ExchangeLocalState(RuntimeState* state, OperatorXBase* parent);
 
     Status init(RuntimeState* state, LocalStateInfo& info) override;
     Status open(RuntimeState* state) override;
     Status close(RuntimeState* state) override;
-    Dependency* dependency() override { return source_dependency.get(); }
     std::string debug_string(int indentation_level) const override;
     std::shared_ptr<doris::vectorized::VDataStreamRecvr> stream_recvr;
     doris::vectorized::VSortExecExprs vsort_exec_exprs;
     int64_t num_rows_skipped;
     bool is_ready;
 
-    std::shared_ptr<AndDependency> source_dependency;
     std::vector<std::shared_ptr<ExchangeDataDependency>> deps;
 
     std::vector<RuntimeProfile::Counter*> metrics;
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp 
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index 4c73f6ecb28..a4f3ff55a5c 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -128,9 +128,13 @@ RuntimeProfile* 
MultiCastDataStreamerSourceOperator::get_runtime_profile() const
 
MultiCastDataStreamSourceLocalState::MultiCastDataStreamSourceLocalState(RuntimeState*
 state,
                                                                          
OperatorXBase* parent)
         : Base(state, parent),
-          vectorized::RuntimeFilterConsumer(
-                  static_cast<Parent*>(parent)->dest_id_from_sink(), 
parent->runtime_filter_descs(),
-                  static_cast<Parent*>(parent)->_row_desc(), _conjuncts) {};
+          
vectorized::RuntimeFilterConsumer(static_cast<Parent*>(parent)->dest_id_from_sink(),
+                                            parent->runtime_filter_descs(),
+                                            
static_cast<Parent*>(parent)->_row_desc(), _conjuncts) {
+    _filter_dependency = std::make_shared<RuntimeFilterDependency>(
+            parent->operator_id(), parent->node_id(), parent->get_name() + 
"_FILTER_DEPENDENCY",
+            state->get_query_ctx());
+};
 
 Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h 
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index 86034a76ce7..baeca2ca7b1 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -120,8 +120,11 @@ public:
 
     friend class MultiCastDataStreamerSourceOperatorX;
 
+    RuntimeFilterDependency* filterdependency() override { return 
_filter_dependency.get(); }
+
 private:
     vectorized::VExprContextSPtrs _output_expr_contexts;
+    std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
 };
 
 class MultiCastDataStreamerSourceOperatorX final
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index d8d3958a779..398765c8cc6 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -100,7 +100,14 @@ std::string ScanOperator::debug_string() const {
 
 template <typename Derived>
 ScanLocalState<Derived>::ScanLocalState(RuntimeState* state, OperatorXBase* 
parent)
-        : ScanLocalStateBase(state, parent) {}
+        : ScanLocalStateBase(state, parent) {
+    _finish_dependency = std::make_shared<FinishDependency>(
+            parent->operator_id(), parent->node_id(), parent->get_name() + 
"_FINISH_DEPENDENCY",
+            state->get_query_ctx());
+    _filter_dependency = std::make_shared<RuntimeFilterDependency>(
+            parent->operator_id(), parent->node_id(), parent->get_name() + 
"_FILTER_DEPENDENCY",
+            state->get_query_ctx());
+}
 
 template <typename Derived>
 bool ScanLocalState<Derived>::ready_to_read() {
@@ -1311,6 +1318,9 @@ Status ScanLocalState<Derived>::_init_profile() {
 
     _max_scanner_thread_num = ADD_COUNTER(_runtime_profile, 
"MaxScannerThreadNum", TUnit::UNIT);
 
+    _wait_for_finish_dependency_timer =
+            ADD_TIMER(_runtime_profile, "WaitForPendingFinishDependency");
+
     return Status::OK();
 }
 
@@ -1442,7 +1452,7 @@ Status ScanLocalState<Derived>::close(RuntimeState* 
state) {
         
_scanner_ctx->clear_and_join(reinterpret_cast<ScanLocalStateBase*>(this), 
state);
     }
     COUNTER_SET(_wait_for_dependency_timer, 
_scan_dependency->watcher_elapse_time());
-
+    COUNTER_SET(_wait_for_finish_dependency_timer, 
_finish_dependency->watcher_elapse_time());
     return PipelineXLocalState<>::close(state);
 }
 
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 603f3804aae..78cb399427e 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -171,6 +171,8 @@ protected:
     RuntimeProfile::Counter* _wait_for_scanner_done_timer = nullptr;
     // time of prefilter input block from scanner
     RuntimeProfile::Counter* _wait_for_eos_timer = nullptr;
+
+    RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr;
 };
 
 template <typename LocalStateType>
@@ -211,6 +213,9 @@ class ScanLocalState : public ScanLocalStateBase {
 
     Dependency* dependency() override { return _scan_dependency.get(); }
 
+    RuntimeFilterDependency* filterdependency() override { return 
_filter_dependency.get(); };
+    Dependency* finishdependency() override { return _finish_dependency.get(); 
}
+
 protected:
     template <typename LocalStateType>
     friend class ScanOperatorX;
@@ -405,6 +410,10 @@ protected:
     std::atomic<bool> _eos = false;
 
     std::mutex _block_lock;
+
+    std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
+
+    std::shared_ptr<Dependency> _finish_dependency;
 };
 
 template <typename LocalStateType>
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index a466b8b3b67..55fa103e193 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -308,25 +308,14 @@ Status 
OperatorX<LocalStateType>::setup_local_state(RuntimeState* state, LocalSt
 
 
PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase* 
parent,
                                                          RuntimeState* state)
-        : _parent(parent),
-          _state(state),
-          _finish_dependency(new FinishDependency(parent->operator_id(), 
parent->node_id(),
-                                                  parent->get_name() + 
"_FINISH_DEPENDENCY",
-                                                  state->get_query_ctx())) {}
+        : _parent(parent), _state(state) {}
 
 PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, 
OperatorXBase* parent)
         : _num_rows_returned(0),
           _rows_returned_counter(nullptr),
           _peak_memory_usage_counter(nullptr),
           _parent(parent),
-          _state(state),
-          _finish_dependency(new FinishDependency(parent->operator_id(), 
parent->node_id(),
-                                                  parent->get_name() + 
"_FINISH_DEPENDENCY",
-                                                  state->get_query_ctx())) {
-    _filter_dependency = std::make_shared<RuntimeFilterDependency>(
-            parent->operator_id(), parent->node_id(), parent->get_name() + 
"_FILTER_DEPENDENCY",
-            state->get_query_ctx());
-}
+          _state(state) {}
 
 template <typename DependencyType>
 Status PipelineXLocalState<DependencyType>::init(RuntimeState* state, 
LocalStateInfo& info) {
@@ -334,22 +323,30 @@ Status 
PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalState
     _runtime_profile->set_metadata(_parent->node_id());
     _runtime_profile->set_is_sink(false);
     info.parent_profile->add_child(_runtime_profile.get(), true, nullptr);
-    _wait_for_finish_dependency_timer =
-            ADD_TIMER(_runtime_profile, "WaitForPendingFinishDependency");
+    constexpr auto is_fake_shared =
+            std::is_same_v<typename DependencyType::SharedState, 
FakeSharedState>;
     _dependency = (DependencyType*)info.dependency.get();
     if constexpr (!std::is_same_v<FakeDependency, DependencyType>) {
+        _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
+                _runtime_profile, "WaitForDependency[" + _dependency->name() + 
"]Time", 1);
         auto& deps = info.upstream_dependencies;
         if constexpr (std::is_same_v<LocalExchangeSourceDependency, 
DependencyType>) {
             
_dependency->set_shared_state(info.le_state_map[_parent->operator_id()].first);
-        } else {
+            _shared_state =
+                    (typename 
DependencyType::SharedState*)_dependency->shared_state().get();
+            _shared_state->ref();
+
+            _shared_state->source_dep = _dependency;
+            _shared_state->sink_dep = deps.front().get();
+        } else if constexpr (!is_fake_shared) {
             _dependency->set_shared_state(deps.front()->shared_state());
+            _shared_state =
+                    (typename 
DependencyType::SharedState*)_dependency->shared_state().get();
+            _shared_state->ref();
+
+            _shared_state->source_dep = _dependency;
+            _shared_state->sink_dep = deps.front().get();
         }
-        _shared_state = (typename 
DependencyType::SharedState*)_dependency->shared_state().get();
-        _shared_state->ref();
-        _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
-                _runtime_profile, "WaitForDependency[" + _dependency->name() + 
"]Time", 1);
-        _shared_state->source_dep = _dependency;
-        _shared_state->sink_dep = deps.front().get();
     }
 
     _conjuncts.resize(_parent->_conjuncts.size());
@@ -386,7 +383,6 @@ Status 
PipelineXLocalState<DependencyType>::close(RuntimeState* state) {
     if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
         COUNTER_SET(_wait_for_dependency_timer, 
_dependency->watcher_elapse_time());
     }
-    COUNTER_SET(_wait_for_finish_dependency_timer, 
_finish_dependency->watcher_elapse_time());
     if (_rows_returned_counter != nullptr) {
         COUNTER_SET(_rows_returned_counter, _num_rows_returned);
     }
@@ -405,6 +401,8 @@ Status 
PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
     _profile->set_metadata(_parent->node_id());
     _profile->set_is_sink(true);
     _wait_for_finish_dependency_timer = ADD_TIMER(_profile, 
"PendingFinishDependency");
+    constexpr auto is_fake_shared =
+            std::is_same_v<typename DependencyType::SharedState, 
FakeSharedState>;
     if constexpr (!std::is_same_v<FakeDependency, DependencyType>) {
         auto& deps = info.dependencys;
         _dependency = (DependencyType*)deps.front().get();
@@ -412,12 +410,18 @@ Status 
PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
             _dependency = 
info.le_state_map[_parent->dests_id().front()].second.get();
         }
         if (_dependency) {
-            _shared_state =
-                    (typename 
DependencyType::SharedState*)_dependency->shared_state().get();
+            if constexpr (!is_fake_shared) {
+                _shared_state =
+                        (typename 
DependencyType::SharedState*)_dependency->shared_state().get();
+            }
+
             _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
                     _profile, "WaitForDependency[" + _dependency->name() + 
"]Time", 1);
         }
-        _shared_state->ref();
+        if constexpr (!is_fake_shared) {
+            _shared_state->ref();
+        }
+
     } else {
         auto& deps = info.dependencys;
         deps.front() = std::make_shared<FakeDependency>(0, 0, 
state->get_query_ctx());
@@ -446,7 +450,6 @@ Status 
PipelineXSinkLocalState<DependencyType>::close(RuntimeState* state, Statu
     if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
         COUNTER_SET(_wait_for_dependency_timer, 
_dependency->watcher_elapse_time());
     }
-    COUNTER_SET(_wait_for_finish_dependency_timer, 
_finish_dependency->watcher_elapse_time());
     if (_peak_memory_usage_counter) {
         _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
     }
@@ -536,6 +539,7 @@ Status AsyncWriterSink<Writer, Parent>::close(RuntimeState* 
state, Status exec_s
         return Status::OK();
     }
     COUNTER_SET(_wait_for_dependency_timer, 
_async_writer_dependency->watcher_elapse_time());
+    COUNTER_SET(_wait_for_finish_dependency_timer, 
_finish_dependency->watcher_elapse_time());
     // if the init failed, the _writer may be nullptr. so here need check
     if (_writer) {
         if (_writer->need_normal_close()) {
@@ -630,6 +634,7 @@ template class 
PipelineXSinkLocalState<MultiCastSinkDependency>;
 template class PipelineXSinkLocalState<SetSinkDependency>;
 template class PipelineXSinkLocalState<SetProbeSinkDependency>;
 template class PipelineXSinkLocalState<LocalExchangeSinkDependency>;
+template class PipelineXSinkLocalState<AndDependency>;
 
 template class PipelineXLocalState<HashJoinProbeDependency>;
 template class PipelineXLocalState<SortSourceDependency>;
@@ -642,6 +647,7 @@ template class 
PipelineXLocalState<MultiCastSourceDependency>;
 template class PipelineXLocalState<PartitionSortSourceDependency>;
 template class PipelineXLocalState<SetSourceDependency>;
 template class PipelineXLocalState<LocalExchangeSourceDependency>;
+template class PipelineXLocalState<AndDependency>;
 
 template class AsyncWriterSink<doris::vectorized::VFileResultWriter, 
ResultFileSinkOperatorX>;
 template class AsyncWriterSink<doris::vectorized::VJdbcTableWriter, 
JdbcTableSinkOperatorX>;
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index 783b15ac7eb..d697970311d 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -102,8 +102,10 @@ public:
 
     virtual Dependency* dependency() { return nullptr; }
 
-    Dependency* finishdependency() { return _finish_dependency.get(); }
-    RuntimeFilterDependency* filterdependency() { return 
_filter_dependency.get(); }
+    // override in Scan
+    virtual Dependency* finishdependency() { return nullptr; }
+    //  override in Scan  MultiCastSink
+    virtual RuntimeFilterDependency* filterdependency() { return nullptr; }
 
 protected:
     friend class OperatorXBase;
@@ -121,7 +123,6 @@ protected:
     RuntimeProfile::Counter* _blocks_returned_counter = nullptr;
     RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr;
     RuntimeProfile::Counter* _memory_used_counter = nullptr;
-    RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr;
     RuntimeProfile::Counter* _projection_timer = nullptr;
     RuntimeProfile::Counter* _exec_timer = nullptr;
     // Account for peak memory used by this node
@@ -135,8 +136,6 @@ protected:
     vectorized::VExprContextSPtrs _projections;
     bool _closed = false;
     vectorized::Block _origin_block;
-    std::shared_ptr<Dependency> _finish_dependency;
-    std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
 };
 
 class OperatorXBase : public OperatorBase {
@@ -397,7 +396,8 @@ public:
     RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; }
     virtual Dependency* dependency() { return nullptr; }
 
-    Dependency* finishdependency() { return _finish_dependency.get(); }
+    // override in exchange sink , AsyncWriterSink
+    virtual Dependency* finishdependency() { return nullptr; }
 
 protected:
     DataSinkOperatorXBase* _parent = nullptr;
@@ -424,7 +424,6 @@ protected:
     RuntimeProfile::Counter* _exec_timer = nullptr;
     RuntimeProfile::Counter* _memory_used_counter = nullptr;
     RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr;
-    std::shared_ptr<Dependency> _finish_dependency;
 };
 
 class DataSinkOperatorXBase : public OperatorBase {
@@ -659,7 +658,11 @@ class AsyncWriterSink : public 
PipelineXSinkLocalState<FakeDependency> {
 public:
     using Base = PipelineXSinkLocalState<FakeDependency>;
     AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state)
-            : Base(parent, state), _async_writer_dependency(nullptr) {}
+            : Base(parent, state), _async_writer_dependency(nullptr) {
+        _finish_dependency = std::make_shared<FinishDependency>(
+                parent->operator_id(), parent->node_id(), parent->get_name() + 
"_FINISH_DEPENDENCY",
+                state->get_query_ctx());
+    }
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
 
@@ -672,11 +675,15 @@ public:
 
     Status try_close(RuntimeState* state, Status exec_status) override;
 
+    Dependency* finishdependency() override { return _finish_dependency.get(); 
}
+
 protected:
     vectorized::VExprContextSPtrs _output_vexpr_ctxs;
     std::unique_ptr<Writer> _writer;
 
     std::shared_ptr<AsyncWriterDependency> _async_writer_dependency;
+
+    std::shared_ptr<Dependency> _finish_dependency;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 31ba7b0b882..6cb666da41d 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -601,8 +601,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
 
         auto prepare_and_set_parent_profile = [&](PipelineXTask* task, size_t 
pip_idx) {
             DCHECK(pipeline_id_to_profile[pip_idx]);
-            
RETURN_IF_ERROR(task->prepare(get_task_runtime_state(task->task_id()), 
local_params,
-                                          request.fragment.output_sink));
+            RETURN_IF_ERROR(task->prepare(local_params, 
request.fragment.output_sink));
             return Status::OK();
         };
 
@@ -828,7 +827,7 @@ Status PipelineXFragmentContext::_add_local_exchange(
     OperatorXPtr source_op;
     source_op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id));
     RETURN_IF_ERROR(source_op->init(exchange_type));
-    if (operator_xs.size() > 0) {
+    if (!operator_xs.empty()) {
         RETURN_IF_ERROR(operator_xs.front()->set_child(source_op));
     }
     operator_xs.insert(operator_xs.begin(), source_op);
@@ -878,6 +877,8 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
                                                   const DescriptorTbl& descs, 
OperatorXPtr& op,
                                                   PipelinePtr& cur_pipe, int 
parent_idx,
                                                   int child_idx) {
+    // We directly construct the operator from Thrift because the given array 
is in the order of preorder traversal.
+    // Therefore, here we need to use a stack-like structure.
     _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
     std::stringstream error_msg;
     switch (tnode.node_type) {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 283b7851e4c..be62fcac213 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -56,7 +56,7 @@ PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t 
task_id, RuntimeSta
           _source(_operators.front()),
           _root(_operators.back()),
           _sink(pipeline->sink_shared_pointer()),
-          _le_state_map(le_state_map),
+          _le_state_map(std::move(le_state_map)),
           _task_idx(task_idx),
           _execution_dep(state->get_query_ctx()->get_execution_dependency()) {
     _pipeline_task_watcher.start();
@@ -67,15 +67,13 @@ PipelineXTask::PipelineXTask(PipelinePtr& pipeline, 
uint32_t task_id, RuntimeSta
     pipeline->incr_created_tasks();
 }
 
-Status PipelineXTask::prepare(RuntimeState* state, const 
TPipelineInstanceParams& local_params,
-                              const TDataSink& tsink) {
+Status PipelineXTask::prepare(const TPipelineInstanceParams& local_params, 
const TDataSink& tsink) {
     DCHECK(_sink);
     DCHECK(_cur_state == PipelineTaskState::NOT_READY) << 
get_state_name(_cur_state);
     _init_profile();
     SCOPED_TIMER(_task_profile->total_time_counter());
     SCOPED_CPU_TIMER(_task_cpu_timer);
     SCOPED_TIMER(_prepare_timer);
-    DCHECK_EQ(state, _state);
 
     {
         // set sink local state
@@ -85,20 +83,20 @@ Status PipelineXTask::prepare(RuntimeState* state, const 
TPipelineInstanceParams
                                  get_downstream_dependency(),
                                  _le_state_map,
                                  tsink};
-        RETURN_IF_ERROR(_sink->setup_local_state(state, info));
+        RETURN_IF_ERROR(_sink->setup_local_state(_state, info));
     }
 
     std::vector<TScanRangeParams> no_scan_ranges;
     auto scan_ranges = find_with_default(local_params.per_node_scan_ranges,
                                          _operators.front()->node_id(), 
no_scan_ranges);
-    auto* parent_profile = 
state->get_sink_local_state(_sink->operator_id())->profile();
+    auto* parent_profile = 
_state->get_sink_local_state(_sink->operator_id())->profile();
     for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
         auto& op = _operators[op_idx];
         auto& deps = get_upstream_dependency(op->operator_id());
         LocalStateInfo info {parent_profile, scan_ranges, deps,
                              _le_state_map,  _task_idx,   
_source_dependency[op->operator_id()]};
-        RETURN_IF_ERROR(op->setup_local_state(state, info));
-        parent_profile = state->get_local_state(op->operator_id())->profile();
+        RETURN_IF_ERROR(op->setup_local_state(_state, info));
+        parent_profile = _state->get_local_state(op->operator_id())->profile();
     }
 
     _block = doris::vectorized::Block::create_unique();
@@ -120,7 +118,9 @@ Status PipelineXTask::_extract_dependencies() {
         DCHECK(dep != nullptr);
         _read_dependencies.push_back(dep);
         auto* fin_dep = local_state->finishdependency();
-        _finish_dependencies.push_back(fin_dep);
+        if (fin_dep) {
+            _finish_dependencies.push_back(fin_dep);
+        }
     }
     {
         auto result = 
_state->get_sink_local_state_result(_sink->operator_id());
@@ -132,7 +132,9 @@ Status PipelineXTask::_extract_dependencies() {
         DCHECK(dep != nullptr);
         _write_dependencies = dep;
         auto* fin_dep = local_state->finishdependency();
-        _finish_dependencies.push_back(fin_dep);
+        if (fin_dep) {
+            _finish_dependencies.push_back(fin_dep);
+        }
     }
     {
         auto result = _state->get_local_state_result(_source->operator_id());
@@ -193,6 +195,7 @@ Status PipelineXTask::_open() {
         for (size_t i = 0; i < 2; i++) {
             auto st = local_state->open(_state);
             if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
+                DCHECK(_filter_dependency);
                 _blocked_dep = _filter_dependency->is_blocked_by(this);
                 if (_blocked_dep) {
                     set_state(PipelineTaskState::BLOCKED_FOR_RF);
@@ -377,9 +380,11 @@ std::string PipelineXTask::debug_string() {
     fmt::format_to(debug_string_buffer, "{}. {}\n", i, 
_write_dependencies->debug_string(1));
     i++;
 
-    fmt::format_to(debug_string_buffer, "Runtime Filter Dependency 
Information: \n");
-    fmt::format_to(debug_string_buffer, "{}. {}\n", i, 
_filter_dependency->debug_string(1));
-    i++;
+    if (_filter_dependency) {
+        fmt::format_to(debug_string_buffer, "Runtime Filter Dependency 
Information: \n");
+        fmt::format_to(debug_string_buffer, "{}. {}\n", i, 
_filter_dependency->debug_string(1));
+        i++;
+    }
 
     fmt::format_to(debug_string_buffer, "Finish Dependency Information: \n");
     for (size_t j = 0; j < _finish_dependencies.size(); j++, i++) {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h 
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index fc633339c31..f7b996f40a7 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -62,8 +62,7 @@ public:
         return Status::InternalError("Should not reach here!");
     }
 
-    Status prepare(RuntimeState* state, const TPipelineInstanceParams& 
local_params,
-                   const TDataSink& tsink);
+    Status prepare(const TPipelineInstanceParams& local_params, const 
TDataSink& tsink);
 
     Status execute(bool* eos) override;
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 90ebbff37a2..c54786e5428 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -220,6 +220,7 @@ void FragmentMgr::coordinator_callback(const 
ReportStatusRequest& req) {
     DCHECK(req.runtime_state != nullptr);
 
     if (req.query_statistics) {
+        // use to report 'insert into select'
         TQueryStatistics queryStatistics;
         DCHECK(req.query_statistics->collect_dml_statistics());
         req.query_statistics->to_thrift(&queryStatistics);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to