This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3273e0e6353 [refactor](pipelineX)do not override dependency() function 
in pipelineX (#28848)
3273e0e6353 is described below

commit 3273e0e6353e4c9c03b5dc44912b0ceee3bbcdb1
Author: Mryange <[email protected]>
AuthorDate: Mon Dec 25 10:36:31 2023 +0800

    [refactor](pipelineX)do not override dependency() function in pipelineX 
(#28848)
---
 be/src/pipeline/exec/hashjoin_build_sink.cpp      |  6 +++---
 be/src/pipeline/exec/hashjoin_build_sink.h        | 15 ++++-----------
 be/src/pipeline/exec/join_build_sink_operator.cpp |  2 +-
 be/src/pipeline/exec/result_sink_operator.cpp     | 15 +++++++--------
 be/src/pipeline/exec/result_sink_operator.h       |  9 +++++----
 be/src/pipeline/exec/scan_operator.cpp            | 11 +++++------
 be/src/pipeline/exec/scan_operator.h              |  9 +++++----
 be/src/pipeline/pipeline_x/operator.cpp           |  4 +++-
 be/src/pipeline/pipeline_x/operator.h             |  8 ++++++++
 9 files changed, 41 insertions(+), 38 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 8cd0376a957..8c1ff852433 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -22,6 +22,7 @@
 #include "exprs/bloom_filter_func.h"
 #include "pipeline/exec/hashjoin_probe_operator.h"
 #include "pipeline/exec/operator.h"
+#include "pipeline/pipeline_x/dependency.h"
 #include "vec/exec/join/vhash_join_node.h"
 #include "vec/utils/template_helpers.hpp"
 
@@ -45,8 +46,7 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo
     RETURN_IF_ERROR(JoinBuildSinkLocalState::init(state, info));
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
-    _shared_hash_table_dependency = SharedHashTableDependency::create_shared(
-            _parent->operator_id(), _parent->node_id(), 
state->get_query_ctx());
+    _shared_hash_table_dependency = dependency_sptr();
     auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
     _shared_state->join_op_variants = p._join_op_variants;
 
@@ -119,7 +119,7 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* 
state) {
     auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
 
     for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
-        if (auto bf = _runtime_filters[i]->get_bloomfilter()) {
+        if (auto* bf = _runtime_filters[i]->get_bloomfilter()) {
             RETURN_IF_ERROR(bf->init_with_fixed_length());
         }
     }
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 24faa4115dd..b5ae146c182 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -48,24 +48,18 @@ class HashJoinBuildSinkOperatorX;
 
 class SharedHashTableDependency final : public Dependency {
 public:
+    using SharedState = HashJoinSharedState;
     ENABLE_FACTORY_CREATOR(SharedHashTableDependency);
     SharedHashTableDependency(int id, int node_id, QueryContext* query_ctx)
-            : Dependency(id, node_id, "SharedHashTableDependency", true, 
query_ctx) {}
+            : Dependency(id, node_id, "SharedHashTableBuildDependency", true, 
query_ctx) {}
     ~SharedHashTableDependency() override = default;
 };
 
-class HashJoinBuildSinkDependency final : public Dependency {
-public:
-    using SharedState = HashJoinSharedState;
-    HashJoinBuildSinkDependency(int id, int node_id, QueryContext* query_ctx)
-            : Dependency(id, node_id, "HashJoinBuildSinkDependency", true, 
query_ctx) {}
-    ~HashJoinBuildSinkDependency() override = default;
-};
-
 class HashJoinBuildSinkLocalState final
-        : public JoinBuildSinkLocalState<HashJoinBuildSinkDependency, 
HashJoinBuildSinkLocalState> {
+        : public JoinBuildSinkLocalState<SharedHashTableDependency, 
HashJoinBuildSinkLocalState> {
 public:
     ENABLE_FACTORY_CREATOR(HashJoinBuildSinkLocalState);
+    using Base = JoinBuildSinkLocalState<SharedHashTableDependency, 
HashJoinBuildSinkLocalState>;
     using Parent = HashJoinBuildSinkOperatorX;
     HashJoinBuildSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* 
state);
     ~HashJoinBuildSinkLocalState() override = default;
@@ -86,7 +80,6 @@ public:
     void add_hash_buckets_filled_info(const std::string& info) const {
         _profile->add_info_string("HashTableFilledBuckets", info);
     }
-    Dependency* dependency() override { return 
_shared_hash_table_dependency.get(); }
 
 protected:
     void _hash_table_init(RuntimeState* state);
diff --git a/be/src/pipeline/exec/join_build_sink_operator.cpp 
b/be/src/pipeline/exec/join_build_sink_operator.cpp
index e2cc361c22f..798b5d86b00 100644
--- a/be/src/pipeline/exec/join_build_sink_operator.cpp
+++ b/be/src/pipeline/exec/join_build_sink_operator.cpp
@@ -115,7 +115,7 @@ void 
JoinBuildSinkOperatorX<LocalStateType>::_init_join_op() {
 }
 
 template class JoinBuildSinkOperatorX<HashJoinBuildSinkLocalState>;
-template class JoinBuildSinkLocalState<HashJoinBuildSinkDependency, 
HashJoinBuildSinkLocalState>;
+template class JoinBuildSinkLocalState<SharedHashTableDependency, 
HashJoinBuildSinkLocalState>;
 template class JoinBuildSinkOperatorX<NestedLoopJoinBuildSinkLocalState>;
 template class JoinBuildSinkLocalState<NestedLoopJoinBuildSinkDependency,
                                        NestedLoopJoinBuildSinkLocalState>;
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp 
b/be/src/pipeline/exec/result_sink_operator.cpp
index 33256dc7f00..dcf0d996c62 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -51,7 +51,7 @@ bool ResultSinkOperator::can_write() {
 }
 
 Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& 
info) {
-    RETURN_IF_ERROR(PipelineXSinkLocalState<>::init(state, info));
+    RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     static const std::string timer_name = "WaitForDependencyTime";
@@ -62,18 +62,17 @@ Status ResultSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info)
     RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
             state->fragment_instance_id(), 
vectorized::RESULT_SINK_BUFFER_SIZE, &_sender, true,
             state->execution_timeout()));
-    _result_sink_dependency = ResultSinkDependency::create_shared(
-            _parent->operator_id(), _parent->node_id(), 
state->get_query_ctx());
+    auto result_sink_dependency = dependency_sptr();
     _blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", 
TUnit::UNIT, 1);
     _rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", 
TUnit::UNIT, 1);
-    
((PipBufferControlBlock*)_sender.get())->set_dependency(_result_sink_dependency);
+    
((PipBufferControlBlock*)_sender.get())->set_dependency(result_sink_dependency);
     return Status::OK();
 }
 
 Status ResultSinkLocalState::open(RuntimeState* state) {
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
-    RETURN_IF_ERROR(PipelineXSinkLocalState<>::open(state));
+    RETURN_IF_ERROR(Base::open(state));
     auto& p = _parent->cast<ResultSinkOperatorX>();
     _output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size());
     for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
@@ -150,7 +149,7 @@ Status 
ResultSinkOperatorX::_second_phase_fetch_data(RuntimeState* state,
                                                      vectorized::Block* 
final_block) {
     auto row_id_col = final_block->get_by_position(final_block->columns() - 1);
     CHECK(row_id_col.name == BeConsts::ROWID_COL);
-    auto tuple_desc = _row_desc.tuple_descriptors()[0];
+    auto* tuple_desc = _row_desc.tuple_descriptors()[0];
     FetchOption fetch_option;
     fetch_option.desc = tuple_desc;
     fetch_option.t_fetch_opt = _fetch_option;
@@ -167,7 +166,7 @@ Status ResultSinkLocalState::close(RuntimeState* state, 
Status exec_status) {
     }
     SCOPED_TIMER(_close_timer);
     SCOPED_TIMER(exec_time_counter());
-    COUNTER_SET(_wait_for_dependency_timer, 
_result_sink_dependency->watcher_elapse_time());
+    COUNTER_SET(_wait_for_dependency_timer, 
_dependency->watcher_elapse_time());
     Status final_status = exec_status;
     if (_writer) {
         // close the writer
@@ -189,7 +188,7 @@ Status ResultSinkLocalState::close(RuntimeState* state, 
Status exec_status) {
     static_cast<void>(state->exec_env()->result_mgr()->cancel_at_time(
             time(nullptr) + config::result_buffer_cancelled_interval_time,
             state->fragment_instance_id()));
-    RETURN_IF_ERROR(PipelineXSinkLocalState<>::close(state, exec_status));
+    RETURN_IF_ERROR(Base::close(state, exec_status));
     return final_status;
 }
 
diff --git a/be/src/pipeline/exec/result_sink_operator.h 
b/be/src/pipeline/exec/result_sink_operator.h
index e3d56f176e6..bed6aed8969 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -20,6 +20,7 @@
 #include <stdint.h>
 
 #include "operator.h"
+#include "pipeline/pipeline_x/dependency.h"
 #include "pipeline/pipeline_x/operator.h"
 #include "vec/sink/vresult_sink.h"
 
@@ -45,23 +46,24 @@ public:
 
 class ResultSinkDependency final : public Dependency {
 public:
+    using SharedState = BasicSharedState;
     ENABLE_FACTORY_CREATOR(ResultSinkDependency);
     ResultSinkDependency(int id, int node_id, QueryContext* query_ctx)
             : Dependency(id, node_id, "ResultSinkDependency", true, query_ctx) 
{}
     ~ResultSinkDependency() override = default;
 };
 
-class ResultSinkLocalState final : public PipelineXSinkLocalState<> {
+class ResultSinkLocalState final : public 
PipelineXSinkLocalState<ResultSinkDependency> {
     ENABLE_FACTORY_CREATOR(ResultSinkLocalState);
+    using Base = PipelineXSinkLocalState<ResultSinkDependency>;
 
 public:
     ResultSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
-            : PipelineXSinkLocalState<>(parent, state) {}
+            : Base(parent, state) {}
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
     Status open(RuntimeState* state) override;
     Status close(RuntimeState* state, Status exec_status) override;
-    Dependency* dependency() override { return _result_sink_dependency.get(); }
     RuntimeProfile::Counter* blocks_sent_counter() { return 
_blocks_sent_counter; }
     RuntimeProfile::Counter* rows_sent_counter() { return _rows_sent_counter; }
 
@@ -72,7 +74,6 @@ private:
 
     std::shared_ptr<BufferControlBlock> _sender;
     std::shared_ptr<ResultWriter> _writer;
-    std::shared_ptr<ResultSinkDependency> _result_sink_dependency;
     RuntimeProfile::Counter* _blocks_sent_counter = nullptr;
     RuntimeProfile::Counter* _rows_sent_counter = nullptr;
 };
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 05d9c7292f7..af9529798a3 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -121,15 +121,13 @@ bool ScanLocalState<Derived>::should_run_serial() const {
 
 template <typename Derived>
 Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& 
info) {
-    RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
+    RETURN_IF_ERROR(PipelineXLocalState<ScanDependency>::init(state, info));
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     auto& p = _parent->cast<typename Derived::Parent>();
     RETURN_IF_ERROR(RuntimeFilterConsumer::init(state, 
p.ignore_data_distribution()));
 
-    _scan_dependency = 
ScanDependency::create_shared(PipelineXLocalState<>::_parent->operator_id(),
-                                                     
PipelineXLocalState<>::_parent->node_id(),
-                                                     state->get_query_ctx());
+    _scan_dependency = dependency_sptr();
 
     set_scan_ranges(state, info.scan_ranges);
     _common_expr_ctxs_push_down.resize(p._common_expr_ctxs_push_down.size());
@@ -567,7 +565,8 @@ template <typename Derived>
 std::string ScanLocalState<Derived>::debug_string(int indentation_level) const 
{
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer, "{}, _eos = {}",
-                   PipelineXLocalState<>::debug_string(indentation_level), 
_eos.load());
+                   
PipelineXLocalState<ScanDependency>::debug_string(indentation_level),
+                   _eos.load());
     if (_scanner_ctx) {
         fmt::format_to(debug_string_buffer, "");
         fmt::format_to(
@@ -1457,7 +1456,7 @@ Status ScanLocalState<Derived>::close(RuntimeState* 
state) {
     COUNTER_SET(_wait_for_finish_dependency_timer, 
_finish_dependency->watcher_elapse_time());
     COUNTER_SET(_wait_for_rf_timer, _filter_dependency->watcher_elapse_time());
 
-    return PipelineXLocalState<>::close(state);
+    return PipelineXLocalState<ScanDependency>::close(state);
 }
 
 template <typename LocalStateType>
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 3690e9eb39c..afbcaea0a63 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -24,6 +24,7 @@
 
 #include "common/status.h"
 #include "operator.h"
+#include "pipeline/pipeline_x/dependency.h"
 #include "pipeline/pipeline_x/operator.h"
 #include "runtime/descriptors.h"
 #include "vec/exec/scan/vscan_node.h"
@@ -59,6 +60,7 @@ public:
 
 class ScanDependency final : public Dependency {
 public:
+    using SharedState = FakeSharedState;
     ENABLE_FACTORY_CREATOR(ScanDependency);
     ScanDependency(int id, int node_id, QueryContext* query_ctx)
             : Dependency(id, node_id, "ScanDependency", query_ctx) {}
@@ -98,10 +100,11 @@ private:
     std::mutex _always_done_lock;
 };
 
-class ScanLocalStateBase : public PipelineXLocalState<>, public 
vectorized::RuntimeFilterConsumer {
+class ScanLocalStateBase : public PipelineXLocalState<ScanDependency>,
+                           public vectorized::RuntimeFilterConsumer {
 public:
     ScanLocalStateBase(RuntimeState* state, OperatorXBase* parent)
-            : PipelineXLocalState<>(state, parent),
+            : PipelineXLocalState<ScanDependency>(state, parent),
               vectorized::RuntimeFilterConsumer(parent->node_id(), 
parent->runtime_filter_descs(),
                                                 parent->row_descriptor(), 
_conjuncts) {}
     virtual ~ScanLocalStateBase() = default;
@@ -211,8 +214,6 @@ class ScanLocalState : public ScanLocalStateBase {
 
     int64_t get_push_down_count() override;
 
-    Dependency* dependency() override { return _scan_dependency.get(); }
-
     RuntimeFilterDependency* filterdependency() override { return 
_filter_dependency.get(); };
     Dependency* finishdependency() override { return _finish_dependency.get(); 
}
 
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index d40f7f6adb3..39598dadbd8 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -622,7 +622,7 @@ template class StatefulOperatorX<RepeatLocalState>;
 template class StatefulOperatorX<NestedLoopJoinProbeLocalState>;
 template class StatefulOperatorX<TableFunctionLocalState>;
 
-template class PipelineXSinkLocalState<HashJoinBuildSinkDependency>;
+template class PipelineXSinkLocalState<SharedHashTableDependency>;
 template class PipelineXSinkLocalState<SortSinkDependency>;
 template class PipelineXSinkLocalState<NestedLoopJoinBuildSinkDependency>;
 template class PipelineXSinkLocalState<AnalyticSinkDependency>;
@@ -635,6 +635,7 @@ template class PipelineXSinkLocalState<SetSinkDependency>;
 template class PipelineXSinkLocalState<SetProbeSinkDependency>;
 template class PipelineXSinkLocalState<LocalExchangeSinkDependency>;
 template class PipelineXSinkLocalState<AndDependency>;
+template class PipelineXSinkLocalState<ResultSinkDependency>;
 
 template class PipelineXLocalState<HashJoinProbeDependency>;
 template class PipelineXLocalState<SortSourceDependency>;
@@ -648,6 +649,7 @@ template class 
PipelineXLocalState<PartitionSortSourceDependency>;
 template class PipelineXLocalState<SetSourceDependency>;
 template class PipelineXLocalState<LocalExchangeSourceDependency>;
 template class PipelineXLocalState<AndDependency>;
+template class PipelineXLocalState<ScanDependency>;
 
 template class AsyncWriterSink<doris::vectorized::VFileResultWriter, 
ResultFileSinkOperatorX>;
 template class AsyncWriterSink<doris::vectorized::VJdbcTableWriter, 
JdbcTableSinkOperatorX>;
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index da52706b56c..c4e0a7e94a0 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -348,6 +348,10 @@ public:
 
     Dependency* dependency() override { return _dependency; }
 
+    auto dependency_sptr() {
+        return 
std::dynamic_pointer_cast<DependencyArg>(_dependency->shared_from_this());
+    }
+
 protected:
     DependencyType* _dependency = nullptr;
     typename DependencyType::SharedState* _shared_state = nullptr;
@@ -608,6 +612,10 @@ public:
 
     Dependency* dependency() override { return _dependency; }
 
+    auto dependency_sptr() {
+        return 
std::dynamic_pointer_cast<DependencyArg>(_dependency->shared_from_this());
+    }
+
 protected:
     DependencyType* _dependency = nullptr;
     typename DependencyType::SharedState* _shared_state = nullptr;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to