This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3273e0e6353 [refactor](pipelineX)do not override dependency() function
in pipelineX (#28848)
3273e0e6353 is described below
commit 3273e0e6353e4c9c03b5dc44912b0ceee3bbcdb1
Author: Mryange <[email protected]>
AuthorDate: Mon Dec 25 10:36:31 2023 +0800
[refactor](pipelineX)do not override dependency() function in pipelineX
(#28848)
---
be/src/pipeline/exec/hashjoin_build_sink.cpp | 6 +++---
be/src/pipeline/exec/hashjoin_build_sink.h | 15 ++++-----------
be/src/pipeline/exec/join_build_sink_operator.cpp | 2 +-
be/src/pipeline/exec/result_sink_operator.cpp | 15 +++++++--------
be/src/pipeline/exec/result_sink_operator.h | 9 +++++----
be/src/pipeline/exec/scan_operator.cpp | 11 +++++------
be/src/pipeline/exec/scan_operator.h | 9 +++++----
be/src/pipeline/pipeline_x/operator.cpp | 4 +++-
be/src/pipeline/pipeline_x/operator.h | 8 ++++++++
9 files changed, 41 insertions(+), 38 deletions(-)
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 8cd0376a957..8c1ff852433 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -22,6 +22,7 @@
#include "exprs/bloom_filter_func.h"
#include "pipeline/exec/hashjoin_probe_operator.h"
#include "pipeline/exec/operator.h"
+#include "pipeline/pipeline_x/dependency.h"
#include "vec/exec/join/vhash_join_node.h"
#include "vec/utils/template_helpers.hpp"
@@ -45,8 +46,7 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo
RETURN_IF_ERROR(JoinBuildSinkLocalState::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
- _shared_hash_table_dependency = SharedHashTableDependency::create_shared(
- _parent->operator_id(), _parent->node_id(),
state->get_query_ctx());
+ _shared_hash_table_dependency = dependency_sptr();
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
_shared_state->join_op_variants = p._join_op_variants;
@@ -119,7 +119,7 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState*
state) {
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
- if (auto bf = _runtime_filters[i]->get_bloomfilter()) {
+ if (auto* bf = _runtime_filters[i]->get_bloomfilter()) {
RETURN_IF_ERROR(bf->init_with_fixed_length());
}
}
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 24faa4115dd..b5ae146c182 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -48,24 +48,18 @@ class HashJoinBuildSinkOperatorX;
class SharedHashTableDependency final : public Dependency {
public:
+ using SharedState = HashJoinSharedState;
ENABLE_FACTORY_CREATOR(SharedHashTableDependency);
SharedHashTableDependency(int id, int node_id, QueryContext* query_ctx)
- : Dependency(id, node_id, "SharedHashTableDependency", true,
query_ctx) {}
+ : Dependency(id, node_id, "SharedHashTableBuildDependency", true,
query_ctx) {}
~SharedHashTableDependency() override = default;
};
-class HashJoinBuildSinkDependency final : public Dependency {
-public:
- using SharedState = HashJoinSharedState;
- HashJoinBuildSinkDependency(int id, int node_id, QueryContext* query_ctx)
- : Dependency(id, node_id, "HashJoinBuildSinkDependency", true,
query_ctx) {}
- ~HashJoinBuildSinkDependency() override = default;
-};
-
class HashJoinBuildSinkLocalState final
- : public JoinBuildSinkLocalState<HashJoinBuildSinkDependency,
HashJoinBuildSinkLocalState> {
+ : public JoinBuildSinkLocalState<SharedHashTableDependency,
HashJoinBuildSinkLocalState> {
public:
ENABLE_FACTORY_CREATOR(HashJoinBuildSinkLocalState);
+ using Base = JoinBuildSinkLocalState<SharedHashTableDependency,
HashJoinBuildSinkLocalState>;
using Parent = HashJoinBuildSinkOperatorX;
HashJoinBuildSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState*
state);
~HashJoinBuildSinkLocalState() override = default;
@@ -86,7 +80,6 @@ public:
void add_hash_buckets_filled_info(const std::string& info) const {
_profile->add_info_string("HashTableFilledBuckets", info);
}
- Dependency* dependency() override { return
_shared_hash_table_dependency.get(); }
protected:
void _hash_table_init(RuntimeState* state);
diff --git a/be/src/pipeline/exec/join_build_sink_operator.cpp
b/be/src/pipeline/exec/join_build_sink_operator.cpp
index e2cc361c22f..798b5d86b00 100644
--- a/be/src/pipeline/exec/join_build_sink_operator.cpp
+++ b/be/src/pipeline/exec/join_build_sink_operator.cpp
@@ -115,7 +115,7 @@ void
JoinBuildSinkOperatorX<LocalStateType>::_init_join_op() {
}
template class JoinBuildSinkOperatorX<HashJoinBuildSinkLocalState>;
-template class JoinBuildSinkLocalState<HashJoinBuildSinkDependency,
HashJoinBuildSinkLocalState>;
+template class JoinBuildSinkLocalState<SharedHashTableDependency,
HashJoinBuildSinkLocalState>;
template class JoinBuildSinkOperatorX<NestedLoopJoinBuildSinkLocalState>;
template class JoinBuildSinkLocalState<NestedLoopJoinBuildSinkDependency,
NestedLoopJoinBuildSinkLocalState>;
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp
b/be/src/pipeline/exec/result_sink_operator.cpp
index 33256dc7f00..dcf0d996c62 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -51,7 +51,7 @@ bool ResultSinkOperator::can_write() {
}
Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo&
info) {
- RETURN_IF_ERROR(PipelineXSinkLocalState<>::init(state, info));
+ RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
static const std::string timer_name = "WaitForDependencyTime";
@@ -62,18 +62,17 @@ Status ResultSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info)
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
state->fragment_instance_id(),
vectorized::RESULT_SINK_BUFFER_SIZE, &_sender, true,
state->execution_timeout()));
- _result_sink_dependency = ResultSinkDependency::create_shared(
- _parent->operator_id(), _parent->node_id(),
state->get_query_ctx());
+ auto result_sink_dependency = dependency_sptr();
_blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced",
TUnit::UNIT, 1);
_rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced",
TUnit::UNIT, 1);
-
((PipBufferControlBlock*)_sender.get())->set_dependency(_result_sink_dependency);
+
((PipBufferControlBlock*)_sender.get())->set_dependency(result_sink_dependency);
return Status::OK();
}
Status ResultSinkLocalState::open(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
- RETURN_IF_ERROR(PipelineXSinkLocalState<>::open(state));
+ RETURN_IF_ERROR(Base::open(state));
auto& p = _parent->cast<ResultSinkOperatorX>();
_output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size());
for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
@@ -150,7 +149,7 @@ Status
ResultSinkOperatorX::_second_phase_fetch_data(RuntimeState* state,
vectorized::Block*
final_block) {
auto row_id_col = final_block->get_by_position(final_block->columns() - 1);
CHECK(row_id_col.name == BeConsts::ROWID_COL);
- auto tuple_desc = _row_desc.tuple_descriptors()[0];
+ auto* tuple_desc = _row_desc.tuple_descriptors()[0];
FetchOption fetch_option;
fetch_option.desc = tuple_desc;
fetch_option.t_fetch_opt = _fetch_option;
@@ -167,7 +166,7 @@ Status ResultSinkLocalState::close(RuntimeState* state,
Status exec_status) {
}
SCOPED_TIMER(_close_timer);
SCOPED_TIMER(exec_time_counter());
- COUNTER_SET(_wait_for_dependency_timer,
_result_sink_dependency->watcher_elapse_time());
+ COUNTER_SET(_wait_for_dependency_timer,
_dependency->watcher_elapse_time());
Status final_status = exec_status;
if (_writer) {
// close the writer
@@ -189,7 +188,7 @@ Status ResultSinkLocalState::close(RuntimeState* state,
Status exec_status) {
static_cast<void>(state->exec_env()->result_mgr()->cancel_at_time(
time(nullptr) + config::result_buffer_cancelled_interval_time,
state->fragment_instance_id()));
- RETURN_IF_ERROR(PipelineXSinkLocalState<>::close(state, exec_status));
+ RETURN_IF_ERROR(Base::close(state, exec_status));
return final_status;
}
diff --git a/be/src/pipeline/exec/result_sink_operator.h
b/be/src/pipeline/exec/result_sink_operator.h
index e3d56f176e6..bed6aed8969 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -20,6 +20,7 @@
#include <stdint.h>
#include "operator.h"
+#include "pipeline/pipeline_x/dependency.h"
#include "pipeline/pipeline_x/operator.h"
#include "vec/sink/vresult_sink.h"
@@ -45,23 +46,24 @@ public:
class ResultSinkDependency final : public Dependency {
public:
+ using SharedState = BasicSharedState;
ENABLE_FACTORY_CREATOR(ResultSinkDependency);
ResultSinkDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "ResultSinkDependency", true, query_ctx)
{}
~ResultSinkDependency() override = default;
};
-class ResultSinkLocalState final : public PipelineXSinkLocalState<> {
+class ResultSinkLocalState final : public
PipelineXSinkLocalState<ResultSinkDependency> {
ENABLE_FACTORY_CREATOR(ResultSinkLocalState);
+ using Base = PipelineXSinkLocalState<ResultSinkDependency>;
public:
ResultSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
- : PipelineXSinkLocalState<>(parent, state) {}
+ : Base(parent, state) {}
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
- Dependency* dependency() override { return _result_sink_dependency.get(); }
RuntimeProfile::Counter* blocks_sent_counter() { return
_blocks_sent_counter; }
RuntimeProfile::Counter* rows_sent_counter() { return _rows_sent_counter; }
@@ -72,7 +74,6 @@ private:
std::shared_ptr<BufferControlBlock> _sender;
std::shared_ptr<ResultWriter> _writer;
- std::shared_ptr<ResultSinkDependency> _result_sink_dependency;
RuntimeProfile::Counter* _blocks_sent_counter = nullptr;
RuntimeProfile::Counter* _rows_sent_counter = nullptr;
};
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 05d9c7292f7..af9529798a3 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -121,15 +121,13 @@ bool ScanLocalState<Derived>::should_run_serial() const {
template <typename Derived>
Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo&
info) {
- RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
+ RETURN_IF_ERROR(PipelineXLocalState<ScanDependency>::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
auto& p = _parent->cast<typename Derived::Parent>();
RETURN_IF_ERROR(RuntimeFilterConsumer::init(state,
p.ignore_data_distribution()));
- _scan_dependency =
ScanDependency::create_shared(PipelineXLocalState<>::_parent->operator_id(),
-
PipelineXLocalState<>::_parent->node_id(),
- state->get_query_ctx());
+ _scan_dependency = dependency_sptr();
set_scan_ranges(state, info.scan_ranges);
_common_expr_ctxs_push_down.resize(p._common_expr_ctxs_push_down.size());
@@ -567,7 +565,8 @@ template <typename Derived>
std::string ScanLocalState<Derived>::debug_string(int indentation_level) const
{
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}, _eos = {}",
- PipelineXLocalState<>::debug_string(indentation_level),
_eos.load());
+
PipelineXLocalState<ScanDependency>::debug_string(indentation_level),
+ _eos.load());
if (_scanner_ctx) {
fmt::format_to(debug_string_buffer, "");
fmt::format_to(
@@ -1457,7 +1456,7 @@ Status ScanLocalState<Derived>::close(RuntimeState*
state) {
COUNTER_SET(_wait_for_finish_dependency_timer,
_finish_dependency->watcher_elapse_time());
COUNTER_SET(_wait_for_rf_timer, _filter_dependency->watcher_elapse_time());
- return PipelineXLocalState<>::close(state);
+ return PipelineXLocalState<ScanDependency>::close(state);
}
template <typename LocalStateType>
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index 3690e9eb39c..afbcaea0a63 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -24,6 +24,7 @@
#include "common/status.h"
#include "operator.h"
+#include "pipeline/pipeline_x/dependency.h"
#include "pipeline/pipeline_x/operator.h"
#include "runtime/descriptors.h"
#include "vec/exec/scan/vscan_node.h"
@@ -59,6 +60,7 @@ public:
class ScanDependency final : public Dependency {
public:
+ using SharedState = FakeSharedState;
ENABLE_FACTORY_CREATOR(ScanDependency);
ScanDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "ScanDependency", query_ctx) {}
@@ -98,10 +100,11 @@ private:
std::mutex _always_done_lock;
};
-class ScanLocalStateBase : public PipelineXLocalState<>, public
vectorized::RuntimeFilterConsumer {
+class ScanLocalStateBase : public PipelineXLocalState<ScanDependency>,
+ public vectorized::RuntimeFilterConsumer {
public:
ScanLocalStateBase(RuntimeState* state, OperatorXBase* parent)
- : PipelineXLocalState<>(state, parent),
+ : PipelineXLocalState<ScanDependency>(state, parent),
vectorized::RuntimeFilterConsumer(parent->node_id(),
parent->runtime_filter_descs(),
parent->row_descriptor(),
_conjuncts) {}
virtual ~ScanLocalStateBase() = default;
@@ -211,8 +214,6 @@ class ScanLocalState : public ScanLocalStateBase {
int64_t get_push_down_count() override;
- Dependency* dependency() override { return _scan_dependency.get(); }
-
RuntimeFilterDependency* filterdependency() override { return
_filter_dependency.get(); };
Dependency* finishdependency() override { return _finish_dependency.get();
}
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index d40f7f6adb3..39598dadbd8 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -622,7 +622,7 @@ template class StatefulOperatorX<RepeatLocalState>;
template class StatefulOperatorX<NestedLoopJoinProbeLocalState>;
template class StatefulOperatorX<TableFunctionLocalState>;
-template class PipelineXSinkLocalState<HashJoinBuildSinkDependency>;
+template class PipelineXSinkLocalState<SharedHashTableDependency>;
template class PipelineXSinkLocalState<SortSinkDependency>;
template class PipelineXSinkLocalState<NestedLoopJoinBuildSinkDependency>;
template class PipelineXSinkLocalState<AnalyticSinkDependency>;
@@ -635,6 +635,7 @@ template class PipelineXSinkLocalState<SetSinkDependency>;
template class PipelineXSinkLocalState<SetProbeSinkDependency>;
template class PipelineXSinkLocalState<LocalExchangeSinkDependency>;
template class PipelineXSinkLocalState<AndDependency>;
+template class PipelineXSinkLocalState<ResultSinkDependency>;
template class PipelineXLocalState<HashJoinProbeDependency>;
template class PipelineXLocalState<SortSourceDependency>;
@@ -648,6 +649,7 @@ template class
PipelineXLocalState<PartitionSortSourceDependency>;
template class PipelineXLocalState<SetSourceDependency>;
template class PipelineXLocalState<LocalExchangeSourceDependency>;
template class PipelineXLocalState<AndDependency>;
+template class PipelineXLocalState<ScanDependency>;
template class AsyncWriterSink<doris::vectorized::VFileResultWriter,
ResultFileSinkOperatorX>;
template class AsyncWriterSink<doris::vectorized::VJdbcTableWriter,
JdbcTableSinkOperatorX>;
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index da52706b56c..c4e0a7e94a0 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -348,6 +348,10 @@ public:
Dependency* dependency() override { return _dependency; }
+ auto dependency_sptr() {
+ return
std::dynamic_pointer_cast<DependencyArg>(_dependency->shared_from_this());
+ }
+
protected:
DependencyType* _dependency = nullptr;
typename DependencyType::SharedState* _shared_state = nullptr;
@@ -608,6 +612,10 @@ public:
Dependency* dependency() override { return _dependency; }
+ auto dependency_sptr() {
+ return
std::dynamic_pointer_cast<DependencyArg>(_dependency->shared_from_this());
+ }
+
protected:
DependencyType* _dependency = nullptr;
typename DependencyType::SharedState* _shared_state = nullptr;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]