This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 2975ef9d9b [Opt](pipeline) Refactor the short circuit of join pipeline
(#23639) (#23728)
2975ef9d9b is described below
commit 2975ef9d9b7959da89724e6297edc16ea16063e8
Author: HappenLee <[email protected]>
AuthorDate: Fri Sep 1 07:58:28 2023 +0800
[Opt](pipeline) Refactor the short circuit of join pipeline (#23639)
(#23728)
* [Opt](pipeline) Refactor the short circuit of join pipeline
---
be/src/pipeline/pipeline.cpp | 6 +--
be/src/pipeline/pipeline.h | 77 ++++++++++++++++++++++-----
be/src/pipeline/pipeline_fragment_context.cpp | 8 ++-
be/src/pipeline/pipeline_task.cpp | 12 +++--
be/src/pipeline/pipeline_task.h | 48 ++---------------
5 files changed, 83 insertions(+), 68 deletions(-)
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index ccc7786442..80efbd154a 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -32,14 +32,14 @@ void Pipeline::_init_profile() {
_pipeline_profile.reset(new RuntimeProfile(ss.str()));
}
-Status Pipeline::build_operators(Operators& operators) {
+Status Pipeline::build_operators() {
OperatorPtr pre;
for (auto& operator_t : _operator_builders) {
auto o = operator_t->build_operator();
if (pre) {
o->set_child(pre);
}
- operators.emplace_back(o);
+ _operators.emplace_back(o);
pre = std::move(o);
}
return Status::OK();
@@ -64,4 +64,4 @@ Status Pipeline::set_sink(OperatorBuilderPtr& sink_) {
return Status::OK();
}
-} // namespace doris::pipeline
\ No newline at end of file
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 73b2c3850c..759e4fce2c 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -45,31 +45,52 @@ using PipelineId = uint32_t;
class Pipeline : public std::enable_shared_from_this<Pipeline> {
friend class PipelineTask;
+ friend class PipelineXTask;
public:
Pipeline() = delete;
explicit Pipeline(PipelineId pipeline_id,
std::weak_ptr<PipelineFragmentContext> context)
- : _complete_dependency(0), _pipeline_id(pipeline_id),
_context(context) {
+ : _pipeline_id(pipeline_id), _context(context) {
_init_profile();
}
void add_dependency(std::shared_ptr<Pipeline>& pipeline) {
- pipeline->_parents.push_back(weak_from_this());
- _dependencies.push_back(pipeline);
+ pipeline->_parents.push_back({_operator_builders.size(),
weak_from_this()});
+ _dependencies.push_back({_operator_builders.size(), pipeline});
}
// If all dependencies are finished, this pipeline task should be
scheduled.
// e.g. Hash join probe task will be scheduled once Hash join build task
is finished.
- bool finish_one_dependency(int dependency_core_id) {
- DCHECK(_complete_dependency < _dependencies.size());
- bool finish = _complete_dependency.fetch_add(1) ==
_dependencies.size() - 1;
- if (finish) {
+ void finish_one_dependency(int dep_opr, int dependency_core_id) {
+ std::lock_guard l(_depend_mutex);
+ if (!_operators.empty() && _operators[dep_opr -
1]->can_terminate_early()) {
+ _always_can_read = true;
+ _always_can_write = (dep_opr == _operators.size());
+
+ for (int i = 0; i < _dependencies.size(); ++i) {
+ if (dep_opr == _dependencies[i].first) {
+ _dependencies.erase(_dependencies.begin(),
_dependencies.begin() + i + 1);
+ break;
+ }
+ }
+ } else {
+ for (int i = 0; i < _dependencies.size(); ++i) {
+ if (dep_opr == _dependencies[i].first) {
+ _dependencies.erase(_dependencies.begin() + i);
+ break;
+ }
+ }
+ }
+
+ if (_dependencies.empty()) {
_previous_schedule_id = dependency_core_id;
}
- return finish;
}
- bool has_dependency() { return _complete_dependency.load() <
_dependencies.size(); }
+ bool has_dependency() {
+ std::lock_guard l(_depend_mutex);
+ return !_dependencies.empty();
+ }
Status add_operator(OperatorBuilderPtr& op);
@@ -77,25 +98,53 @@ public:
OperatorBuilderBase* sink() { return _sink.get(); }
- Status build_operators(Operators&);
+ Status build_operators();
RuntimeProfile* pipeline_profile() { return _pipeline_profile.get(); }
private:
void _init_profile();
- std::atomic<uint32_t> _complete_dependency;
OperatorBuilders _operator_builders; // left is _source, right is _root
OperatorBuilderPtr _sink; // put block to sink
- std::vector<std::weak_ptr<Pipeline>> _parents;
- std::vector<std::shared_ptr<Pipeline>> _dependencies;
+ std::mutex _depend_mutex;
+ std::vector<std::pair<int, std::weak_ptr<Pipeline>>> _parents;
+ std::vector<std::pair<int, std::shared_ptr<Pipeline>>> _dependencies;
PipelineId _pipeline_id;
std::weak_ptr<PipelineFragmentContext> _context;
int _previous_schedule_id = -1;
std::unique_ptr<RuntimeProfile> _pipeline_profile;
+
+ Operators _operators;
+ /**
+ * Consider the query plan below:
+ *
+ * ExchangeSource JoinBuild1
+ * \ /
+ * JoinProbe1 (Right Outer) JoinBuild2
+ * \ /
+ * JoinProbe2 (Right Outer)
+ * |
+ * Sink
+ *
+ * Assume JoinBuild1/JoinBuild2 outputs 0 rows, this pipeline task should
not be blocked by ExchangeSource
+ * because we have a determined conclusion that JoinProbe1/JoinProbe2 will
also output 0 rows.
+ *
+ * Assume JoinBuild2 outputs > 0 rows, this pipeline task may be blocked
by Sink because JoinProbe2 will
+ * produce more data.
+ *
+ * Assume both JoinBuild2 outputs 0 rows this pipeline task should not be
blocked by ExchangeSource
+ * and Sink because JoinProbe2 will always produce 0 rows and terminate
early.
+ *
+ * In a nutshell, we should follow the rules:
+ * 1. if any operator in pipeline can terminate early, this task should
never be blocked by source operator.
+ * 2. if the last operator (except sink) can terminate early, this task
should never be blocked by sink operator.
+ */
+ bool _always_can_read = false;
+ bool _always_can_write = false;
};
-} // namespace doris::pipeline
\ No newline at end of file
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index a5a93775a9..0e84131401 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -337,11 +337,9 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
// TODO pipeline 1 need to add new interface for exec node and operator
sink->init(request.fragment.output_sink);
- Operators operators;
- RETURN_IF_ERROR(pipeline->build_operators(operators));
- auto task =
- std::make_unique<PipelineTask>(pipeline, _total_tasks++,
_runtime_state.get(),
- operators, sink, this,
pipeline->pipeline_profile());
+ RETURN_IF_ERROR(pipeline->build_operators());
+ auto task = std::make_unique<PipelineTask>(pipeline, _total_tasks++,
_runtime_state.get(),
+ sink, this,
pipeline->pipeline_profile());
sink->set_child(task->get_root());
_tasks.emplace_back(std::move(task));
_runtime_profile->add_child(pipeline->pipeline_profile(), true,
nullptr);
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 645028a3dc..7a16c07d13 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -44,12 +44,18 @@ class TaskGroup;
namespace doris::pipeline {
PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t index,
RuntimeState* state,
- Operators& operators, OperatorPtr& sink,
- PipelineFragmentContext* fragment_context,
+ OperatorPtr& sink, PipelineFragmentContext*
fragment_context,
RuntimeProfile* parent_profile)
: _index(index),
_pipeline(pipeline),
- _operators(operators),
+ _prepared(false),
+ _opened(false),
+ _state(state),
+ _cur_state(PipelineTaskState::NOT_READY),
+ _data_state(SourceState::DEPEND_ON_SOURCE),
+ _fragment_context(fragment_context),
+ _parent_profile(parent_profile),
+ _operators(pipeline->_operators),
_source(_operators.front()),
_root(_operators.back()),
_sink(sink),
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 34382a3f7c..5e7b436202 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -110,9 +110,8 @@ class TaskQueue;
// The class do the pipeline task. Minest schdule union by task scheduler
class PipelineTask {
public:
- PipelineTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state,
Operators& operators,
- OperatorPtr& sink, PipelineFragmentContext* fragment_context,
- RuntimeProfile* parent_profile);
+ PipelineTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state,
OperatorPtr& sink,
+ PipelineFragmentContext* fragment_context, RuntimeProfile*
parent_profile);
Status prepare(RuntimeState* state);
@@ -152,50 +151,13 @@ public:
return false;
}
- bool source_can_read() {
- return _source->can_read() || ignore_blocking_source();
- ;
- }
+ bool source_can_read() { return _source->can_read() ||
_pipeline->_always_can_read; }
bool runtime_filters_are_ready_or_timeout() {
return _source->runtime_filters_are_ready_or_timeout();
}
- bool sink_can_write() { return _sink->can_write() ||
ignore_blocking_sink(); }
- /**
- * Consider the query plan below:
- *
- * ExchangeSource JoinBuild1
- * \ /
- * JoinProbe1 (Right Outer) JoinBuild2
- * \ /
- * JoinProbe2 (Right Outer)
- * |
- * Sink
- *
- * Assume JoinBuild1/JoinBuild2 outputs 0 rows, this pipeline task should
not be blocked by ExchangeSource
- * because we have a determined conclusion that JoinProbe1/JoinProbe2 will
also output 0 rows.
- *
- * Assume JoinBuild2 outputs > 0 rows, this pipeline task may be blocked
by Sink because JoinProbe2 will
- * produce more data.
- *
- * Assume both JoinBuild2 outputs 0 rows this pipeline task should not be
blocked by ExchangeSource
- * and Sink because JoinProbe2 will always produce 0 rows and terminate
early.
- *
- * In a nutshell, we should follow the rules:
- * 1. if any operator in pipeline can terminate early, this task should
never be blocked by source operator.
- * 2. if the last operator (except sink) can terminate early, this task
should never be blocked by sink operator.
- */
- [[nodiscard]] bool ignore_blocking_sink() { return
_root->can_terminate_early(); }
-
- [[nodiscard]] bool ignore_blocking_source() {
- for (size_t i = 1; i < _operators.size(); i++) {
- if (_operators[i]->can_terminate_early()) {
- return true;
- }
- }
- return false;
- }
+ bool sink_can_write() { return _sink->can_write() ||
_pipeline->_always_can_write; }
Status finalize();
@@ -283,7 +245,7 @@ public:
private:
void _finish_p_dependency() {
for (const auto& p : _pipeline->_parents) {
- p.lock()->finish_one_dependency(_previous_schedule_id);
+ p.second.lock()->finish_one_dependency(p.first,
_previous_schedule_id);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]