This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 78165a3f6b6 [refine](pipelineX) use finish dependency in task (#25881)
78165a3f6b6 is described below
commit 78165a3f6b6000445777f487ad596447407a6e5a
Author: Mryange <[email protected]>
AuthorDate: Thu Oct 26 10:32:13 2023 +0800
[refine](pipelineX) use finish dependency in task (#25881)
---
be/src/pipeline/exec/exchange_sink_operator.cpp | 5 ---
be/src/pipeline/exec/exchange_sink_operator.h | 1 -
be/src/pipeline/exec/jdbc_table_sink_operator.cpp | 5 ---
be/src/pipeline/exec/jdbc_table_sink_operator.h | 2 --
.../exec/multi_cast_data_stream_source.cpp | 2 ++
.../pipeline/exec/multi_cast_data_stream_source.h | 6 ----
be/src/pipeline/exec/olap_table_sink_operator.h | 6 ----
be/src/pipeline/exec/result_file_sink_operator.cpp | 6 ----
be/src/pipeline/exec/result_file_sink_operator.h | 2 --
be/src/pipeline/exec/scan_operator.cpp | 17 ++--------
be/src/pipeline/exec/scan_operator.h | 4 ---
be/src/pipeline/pipeline_x/dependency.h | 36 +++++++++++++++++++---
be/src/pipeline/pipeline_x/operator.cpp | 9 ++++--
be/src/pipeline/pipeline_x/operator.h | 12 ++++----
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 11 +++++++
be/src/pipeline/pipeline_x/pipeline_x_task.h | 23 +++++++++-----
16 files changed, 74 insertions(+), 73 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 851c9477966..0988052a981 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -510,9 +510,4 @@ Status ExchangeSinkLocalState::close(RuntimeState* state,
Status exec_status) {
return PipelineXSinkLocalState<>::close(state, exec_status);
}
-FinishDependency* ExchangeSinkOperatorX::finish_blocked_by(RuntimeState*
state) const {
- auto& local_state =
state->get_sink_local_state(operator_id())->cast<ExchangeSinkLocalState>();
- return local_state._finish_dependency->finish_blocked_by();
-}
-
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 5ef3d9ee715..69fa4da4be4 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -232,7 +232,6 @@ public:
int num_receivers = 1);
Status try_close(RuntimeState* state, Status exec_status) override;
- FinishDependency* finish_blocked_by(RuntimeState* state) const override;
private:
friend class ExchangeSinkLocalState;
diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
index 139c6c434a8..ae8fcba3d1c 100644
--- a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
+++ b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
@@ -63,9 +63,4 @@ Status JdbcTableSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* bloc
return Status::OK();
}
-FinishDependency* JdbcTableSinkOperatorX::finish_blocked_by(RuntimeState*
state) const {
- auto& local_state =
state->get_sink_local_state(operator_id())->cast<JdbcTableSinkLocalState>();
- return local_state._finish_dependency->finish_blocked_by();
-}
-
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.h
b/be/src/pipeline/exec/jdbc_table_sink_operator.h
index 02ccd00fa3f..41348ccaccf 100644
--- a/be/src/pipeline/exec/jdbc_table_sink_operator.h
+++ b/be/src/pipeline/exec/jdbc_table_sink_operator.h
@@ -55,8 +55,6 @@ public:
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
- FinishDependency* finish_blocked_by(RuntimeState* state) const override;
-
private:
friend class JdbcTableSinkLocalState;
template <typename Writer, typename Parent>
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index 1d3565c65b5..dd1c1e46d21 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -145,6 +145,8 @@ Status
MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
}
// init profile for runtime filter
RuntimeFilterConsumer::_init_profile(profile());
+ _filter_dependency->set_filter_blocked_by_fn(
+ [this]() { return this->runtime_filters_are_ready_or_timeout(); });
return Status::OK();
}
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index ee724b1ff1d..aafacf9d87d 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -171,12 +171,6 @@ public:
int dest_id_from_sink() const { return _t_data_stream_sink.dest_node_id; }
- bool runtime_filters_are_ready_or_timeout(RuntimeState* state) const
override {
- return state->get_local_state(operator_id())
- ->template cast<MultiCastDataStreamSourceLocalState>()
- .runtime_filters_are_ready_or_timeout();
- }
-
private:
friend class MultiCastDataStreamSourceLocalState;
const int _consumer_id;
diff --git a/be/src/pipeline/exec/olap_table_sink_operator.h
b/be/src/pipeline/exec/olap_table_sink_operator.h
index 6c84f3ef523..877a42937da 100644
--- a/be/src/pipeline/exec/olap_table_sink_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_operator.h
@@ -100,12 +100,6 @@ public:
return local_state.sink(state, in_block, source_state);
}
- FinishDependency* finish_blocked_by(RuntimeState* state) const override {
- auto& local_state =
-
state->get_sink_local_state(operator_id())->cast<OlapTableSinkLocalState>();
- return local_state._finish_dependency->finish_blocked_by();
- };
-
private:
friend class OlapTableSinkLocalState;
template <typename Writer, typename Parent>
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 79380642863..ea765055cda 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -269,10 +269,4 @@ Status ResultFileSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* in_
return local_state.sink(state, in_block, source_state);
}
-FinishDependency* ResultFileSinkOperatorX::finish_blocked_by(RuntimeState*
state) const {
- auto& local_state =
-
state->get_sink_local_state(operator_id())->cast<ResultFileSinkLocalState>();
- return local_state._finish_dependency->finish_blocked_by();
-}
-
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/result_file_sink_operator.h
b/be/src/pipeline/exec/result_file_sink_operator.h
index f80093428cf..3a98401de60 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.h
+++ b/be/src/pipeline/exec/result_file_sink_operator.h
@@ -94,8 +94,6 @@ public:
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
- FinishDependency* finish_blocked_by(RuntimeState* state) const override;
-
private:
friend class ResultFileSinkLocalState;
template <typename Writer, typename Parent>
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index adee7e7d0b3..38f7864db97 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -128,7 +128,8 @@ Status ScanLocalState<Derived>::init(RuntimeState* state,
LocalStateInfo& info)
_source_dependency->add_child(_open_dependency);
_eos_dependency =
EosDependency::create_shared(PipelineXLocalState<>::_parent->operator_id());
_source_dependency->add_child(_eos_dependency);
-
+ _filter_dependency->set_filter_blocked_by_fn(
+ [this]() { return this->runtime_filters_are_ready_or_timeout(); });
auto& p = _parent->cast<typename Derived::Parent>();
set_scan_ranges(info.scan_ranges);
_common_expr_ctxs_push_down.resize(p._common_expr_ctxs_push_down.size());
@@ -1283,12 +1284,6 @@ ScanOperatorX<LocalStateType>::ScanOperatorX(ObjectPool*
pool, const TPlanNode&
}
}
-template <typename LocalStateType>
-FinishDependency*
ScanOperatorX<LocalStateType>::finish_blocked_by(RuntimeState* state) const {
- auto& local_state = state->get_local_state(operator_id())->template
cast<LocalStateType>();
- return local_state._finish_dependency->finish_blocked_by();
-}
-
template <typename LocalStateType>
Status ScanOperatorX<LocalStateType>::init(const TPlanNode& tnode,
RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<LocalStateType>::init(tnode, state));
@@ -1370,14 +1365,6 @@ Status ScanLocalState<Derived>::close(RuntimeState*
state) {
return PipelineXLocalState<>::close(state);
}
-template <typename LocalStateType>
-bool ScanOperatorX<LocalStateType>::runtime_filters_are_ready_or_timeout(
- RuntimeState* state) const {
- return state->get_local_state(operator_id())
- ->template cast<LocalStateType>()
- .runtime_filters_are_ready_or_timeout();
-}
-
template <typename LocalStateType>
Status ScanOperatorX<LocalStateType>::get_block(RuntimeState* state,
vectorized::Block* block,
SourceState& source_state) {
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index 4355efed8a6..022b57ee9bf 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -414,12 +414,8 @@ protected:
template <typename LocalStateType>
class ScanOperatorX : public OperatorX<LocalStateType> {
public:
- bool runtime_filters_are_ready_or_timeout(RuntimeState* state) const
override;
-
Status try_close(RuntimeState* state) override;
- FinishDependency* finish_blocked_by(RuntimeState* state) const override;
-
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override { return
OperatorXBase::prepare(state); }
Status open(RuntimeState* state) override;
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index 488c33c5a43..028b78957b4 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -152,7 +152,8 @@ protected:
class FinishDependency final : public Dependency {
public:
- FinishDependency(int id, std::string name) : Dependency(id, name),
_ready_to_finish(true) {}
+ FinishDependency(int id, int node_id, std::string name)
+ : Dependency(id, name), _ready_to_finish(true), _node_id(node_id)
{}
~FinishDependency() override = default;
void start_finish_watcher() {
@@ -162,15 +163,15 @@ public:
_finish_dependency_watcher.start();
}
- [[nodiscard]] virtual int64_t finish_watcher_elapse_time() {
+ [[nodiscard]] int64_t finish_watcher_elapse_time() {
return _finish_dependency_watcher.elapsed_time();
}
- [[nodiscard]] virtual FinishDependency* finish_blocked_by() {
+ [[nodiscard]] FinishDependency* finish_blocked_by() {
if (config::enable_fuzzy_mode && !_ready_to_finish &&
_finish_dependency_watcher.elapsed_time() >
SLOW_DEPENDENCY_THRESHOLD) {
LOG(WARNING) << "========Dependency may be blocked by some
reasons: " << name() << " "
- << id();
+ << _node_id;
}
return _ready_to_finish ? nullptr : this;
}
@@ -190,6 +191,33 @@ public:
protected:
std::atomic<bool> _ready_to_finish;
MonotonicStopWatch _finish_dependency_watcher;
+ const int _node_id;
+};
+
+class FilterDependency final : public Dependency {
+public:
+ FilterDependency(int id, int node_id, std::string name)
+ : Dependency(id, name),
+ _runtime_filters_are_ready_or_timeout(nullptr),
+ _node_id(node_id) {}
+
+ FilterDependency* filter_blocked_by() {
+ if (!_runtime_filters_are_ready_or_timeout) {
+ return nullptr;
+ }
+ if (!_runtime_filters_are_ready_or_timeout()) {
+ return this;
+ }
+ return nullptr;
+ }
+ void* shared_state() override { return nullptr; }
+ void set_filter_blocked_by_fn(std::function<bool()> call_fn) {
+ _runtime_filters_are_ready_or_timeout = call_fn;
+ }
+
+protected:
+ std::function<bool()> _runtime_filters_are_ready_or_timeout;
+ const int _node_id;
};
class AndDependency final : public WriteDependency {
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index 7fa8a14993e..9377f5f2701 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -303,7 +303,7 @@
PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase*
RuntimeState* state)
: _parent(parent),
_state(state),
- _finish_dependency(new FinishDependency(parent->operator_id(),
+ _finish_dependency(new FinishDependency(parent->operator_id(),
parent->node_id(),
parent->get_name() +
"_FINISH_DEPENDENCY")) {}
PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state,
OperatorXBase* parent)
@@ -312,8 +312,11 @@
PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXB
_peak_memory_usage_counter(nullptr),
_parent(parent),
_state(state),
- _finish_dependency(new FinishDependency(parent->operator_id(),
- parent->get_name() +
"_FINISH_DEPENDENCY")) {}
+ _finish_dependency(new FinishDependency(parent->operator_id(),
parent->node_id(),
+ parent->get_name() +
"_FINISH_DEPENDENCY")) {
+ _filter_dependency = std::make_unique<FilterDependency>(
+ parent->operator_id(), parent->node_id(), parent->get_name() +
"_FILTER_DEPENDENCY");
+}
template <typename DependencyType>
Status PipelineXLocalState<DependencyType>::init(RuntimeState* state,
LocalStateInfo& info) {
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index a6ef8a81acb..bfad246d3e6 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -99,6 +99,9 @@ public:
virtual Dependency* dependency() { return nullptr; }
+ FinishDependency* finishdependency() { return _finish_dependency.get(); }
+ FilterDependency* filterdependency() { return _filter_dependency.get(); }
+
protected:
friend class OperatorXBase;
@@ -130,6 +133,7 @@ protected:
bool _closed = false;
vectorized::Block _origin_block;
std::shared_ptr<FinishDependency> _finish_dependency;
+ std::unique_ptr<FilterDependency> _filter_dependency;
};
class OperatorXBase : public OperatorBase {
@@ -202,12 +206,8 @@ public:
return true;
}
- virtual bool runtime_filters_are_ready_or_timeout(RuntimeState* state)
const { return true; }
-
Status close(RuntimeState* state) override;
- virtual FinishDependency* finish_blocked_by(RuntimeState* state) const {
return nullptr; }
-
[[nodiscard]] virtual const RowDescriptor& intermediate_row_desc() const {
return _row_descriptor;
}
@@ -368,6 +368,8 @@ public:
virtual WriteDependency* dependency() { return nullptr; }
+ FinishDependency* finishdependency() { return _finish_dependency.get(); }
+
protected:
DataSinkOperatorXBase* _parent;
RuntimeState* _state;
@@ -469,8 +471,6 @@ public:
return false;
}
- virtual FinishDependency* finish_blocked_by(RuntimeState* state) const {
return nullptr; }
-
[[nodiscard]] std::string debug_string() const override { return ""; }
[[nodiscard]] virtual std::string debug_string(int indentation_level)
const;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 30be4f6240d..9ad91341ce5 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -108,6 +108,8 @@ Status PipelineXTask::extract_dependencies() {
auto* dep = local_state->dependency();
DCHECK(dep != nullptr);
_read_dependencies.push_back(dep);
+ auto* fin_dep = local_state->finishdependency();
+ _finish_dependencies.push_back(fin_dep);
}
{
auto result =
_state->get_sink_local_state_result(_sink->operator_id());
@@ -118,6 +120,15 @@ Status PipelineXTask::extract_dependencies() {
auto* dep = local_state->dependency();
DCHECK(dep != nullptr);
_write_dependencies = dep;
+ auto* fin_dep = local_state->finishdependency();
+ _finish_dependencies.push_back(fin_dep);
+ }
+ {
+ auto result = _state->get_local_state_result(_source->operator_id());
+ if (!result) {
+ return result.error();
+ }
+ _filter_dependency = result.value()->filterdependency();
}
return Status::OK();
}
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 0120cfa2e54..5155c2fe766 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -78,6 +78,7 @@ public:
auto* dep = op_dep->read_blocked_by();
if (dep != nullptr) {
dep->start_read_watcher();
+ push_blocked_task_to_dependency(dep);
return false;
}
}
@@ -85,13 +86,19 @@ public:
}
bool runtime_filters_are_ready_or_timeout() override {
- return _source->runtime_filters_are_ready_or_timeout(_state);
+ auto* dep = _filter_dependency->filter_blocked_by();
+ if (dep != nullptr) {
+ push_blocked_task_to_dependency(dep);
+ return false;
+ }
+ return true;
}
bool sink_can_write() override {
auto* dep = _write_dependencies->write_blocked_by();
if (dep != nullptr) {
dep->start_write_watcher();
+ push_blocked_task_to_dependency(dep);
return false;
}
return true;
@@ -102,18 +109,14 @@ public:
std::string debug_string() override;
bool is_pending_finish() override {
- for (auto& op : _operators) {
- auto dep = op->finish_blocked_by(_state);
+ for (auto* fin_dep : _finish_dependencies) {
+ auto* dep = fin_dep->finish_blocked_by();
if (dep != nullptr) {
dep->start_finish_watcher();
+ push_blocked_task_to_dependency(dep);
return true;
}
}
- auto dep = _sink->finish_blocked_by(_state);
- if (dep != nullptr) {
- dep->start_finish_watcher();
- return true;
- }
return false;
}
@@ -146,6 +149,8 @@ public:
Status extract_dependencies();
+ void push_blocked_task_to_dependency(Dependency* dep) {}
+
private:
void set_close_pipeline_time() override {}
void _init_profile() override;
@@ -160,6 +165,8 @@ private:
std::vector<Dependency*> _read_dependencies;
WriteDependency* _write_dependencies;
+ std::vector<FinishDependency*> _finish_dependencies;
+ FilterDependency* _filter_dependency;
DependencyMap _upstream_dependency;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]