This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6d41272421 [Opt](pipeline) Refactor the short circuit of join pipeline
(#23639)
6d41272421 is described below
commit 6d412724210f228064d08483b18b7f3827618de7
Author: HappenLee <[email protected]>
AuthorDate: Wed Aug 30 18:44:14 2023 +0800
[Opt](pipeline) Refactor the short circuit of join pipeline (#23639)
* [Opt](pipeline) Refactor the short circuit of join pipeline
* change core by cr
---
be/src/pipeline/pipeline.cpp | 12 ++--
be/src/pipeline/pipeline.h | 81 +++++++++++++++++++++------
be/src/pipeline/pipeline_fragment_context.cpp | 8 +--
be/src/pipeline/pipeline_task.cpp | 5 +-
be/src/pipeline/pipeline_task.h | 46 ++-------------
be/src/pipeline/pipeline_x/pipeline_x_task.h | 17 ++----
6 files changed, 85 insertions(+), 84 deletions(-)
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 69eaba3fbb..3c988ed441 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -32,14 +32,14 @@ void Pipeline::_init_profile() {
_pipeline_profile.reset(new RuntimeProfile(ss.str()));
}
-Status Pipeline::build_operators(Operators& operators) {
+Status Pipeline::build_operators() {
OperatorPtr pre;
for (auto& operator_t : _operator_builders) {
auto o = operator_t->build_operator();
if (pre) {
o->set_child(pre);
}
- operators.emplace_back(o);
+ _operators.emplace_back(o);
pre = std::move(o);
}
return Status::OK();
@@ -54,17 +54,17 @@ Status Pipeline::add_operator(OperatorBuilderPtr& op) {
}
Status Pipeline::add_operator(OperatorXPtr& op) {
- if (_operators.empty() && !op->is_source()) {
+ if (operatorXs.empty() && !op->is_source()) {
return Status::InternalError("Should set source before other
operator");
}
- _operators.emplace_back(op);
+ operatorXs.emplace_back(op);
return Status::OK();
}
Status Pipeline::prepare(RuntimeState* state) {
// TODO
- RETURN_IF_ERROR(_operators.back()->prepare(state));
- RETURN_IF_ERROR(_operators.back()->open(state));
+ RETURN_IF_ERROR(operatorXs.back()->prepare(state));
+ RETURN_IF_ERROR(operatorXs.back()->open(state));
RETURN_IF_ERROR(_sink_x->prepare(state));
RETURN_IF_ERROR(_sink_x->open(state));
return Status::OK();
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 114f51071d..4553c1e212 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -45,31 +45,52 @@ using PipelineId = uint32_t;
class Pipeline : public std::enable_shared_from_this<Pipeline> {
friend class PipelineTask;
+ friend class PipelineXTask;
public:
Pipeline() = delete;
explicit Pipeline(PipelineId pipeline_id,
std::weak_ptr<PipelineFragmentContext> context)
- : _complete_dependency(0), _pipeline_id(pipeline_id),
_context(context) {
+ : _pipeline_id(pipeline_id), _context(context) {
_init_profile();
}
void add_dependency(std::shared_ptr<Pipeline>& pipeline) {
- pipeline->_parents.push_back(weak_from_this());
- _dependencies.push_back(pipeline);
+ pipeline->_parents.push_back({_operator_builders.size(),
weak_from_this()});
+ _dependencies.push_back({_operator_builders.size(), pipeline});
}
// If all dependencies are finished, this pipeline task should be
scheduled.
// e.g. Hash join probe task will be scheduled once Hash join build task
is finished.
- bool finish_one_dependency(int dependency_core_id) {
- DCHECK(_complete_dependency < _dependencies.size());
- bool finish = _complete_dependency.fetch_add(1) ==
_dependencies.size() - 1;
- if (finish) {
+ void finish_one_dependency(int dep_opr, int dependency_core_id) {
+ std::lock_guard l(_depend_mutex);
+ if (!_operators.empty() && _operators[dep_opr -
1]->can_terminate_early()) {
+ _always_can_read = true;
+ _always_can_write = (dep_opr == _operators.size());
+
+ for (int i = 0; i < _dependencies.size(); ++i) {
+ if (dep_opr == _dependencies[i].first) {
+ _dependencies.erase(_dependencies.begin(),
_dependencies.begin() + i + 1);
+ break;
+ }
+ }
+ } else {
+ for (int i = 0; i < _dependencies.size(); ++i) {
+ if (dep_opr == _dependencies[i].first) {
+ _dependencies.erase(_dependencies.begin() + i);
+ break;
+ }
+ }
+ }
+
+ if (_dependencies.empty()) {
_previous_schedule_id = dependency_core_id;
}
- return finish;
}
- bool has_dependency() { return _complete_dependency.load() <
_dependencies.size(); }
+ bool has_dependency() {
+ std::lock_guard l(_depend_mutex);
+ return !_dependencies.empty();
+ }
Status add_operator(OperatorBuilderPtr& op);
@@ -83,28 +104,28 @@ public:
OperatorBuilderBase* sink() { return _sink.get(); }
DataSinkOperatorX* sink_x() { return _sink_x.get(); }
- OperatorXs& operator_xs() { return _operators; }
+ OperatorXs& operator_xs() { return operatorXs; }
DataSinkOperatorXPtr sink_shared_pointer() { return _sink_x; }
- Status build_operators(Operators&);
+ Status build_operators();
RuntimeProfile* pipeline_profile() { return _pipeline_profile.get(); }
const RowDescriptor& output_row_desc() const {
- return _operators[_operators.size() - 1]->row_desc();
+ return operatorXs[operatorXs.size() - 1]->row_desc();
}
PipelineId id() const { return _pipeline_id; }
private:
void _init_profile();
- std::atomic<uint32_t> _complete_dependency;
OperatorBuilders _operator_builders; // left is _source, right is _root
OperatorBuilderPtr _sink; // put block to sink
- std::vector<std::weak_ptr<Pipeline>> _parents;
- std::vector<std::shared_ptr<Pipeline>> _dependencies;
+ std::mutex _depend_mutex;
+ std::vector<std::pair<int, std::weak_ptr<Pipeline>>> _parents;
+ std::vector<std::pair<int, std::shared_ptr<Pipeline>>> _dependencies;
PipelineId _pipeline_id;
std::weak_ptr<PipelineFragmentContext> _context;
@@ -114,10 +135,38 @@ private:
// Operators for pipelineX. All pipeline tasks share operators from this.
// [SourceOperator -> ... -> SinkOperator]
- OperatorXs _operators;
+ OperatorXs operatorXs;
DataSinkOperatorXPtr _sink_x;
std::shared_ptr<ObjectPool> _obj_pool;
+
+ Operators _operators;
+ /**
+ * Consider the query plan below:
+ *
+ * ExchangeSource JoinBuild1
+ * \ /
+ * JoinProbe1 (Right Outer) JoinBuild2
+ * \ /
+ * JoinProbe2 (Right Outer)
+ * |
+ * Sink
+ *
+ * Assume JoinBuild1/JoinBuild2 outputs 0 rows, this pipeline task should
not be blocked by ExchangeSource
+ * because we have a determined conclusion that JoinProbe1/JoinProbe2 will
also output 0 rows.
+ *
+ * Assume JoinBuild2 outputs > 0 rows, this pipeline task may be blocked
by Sink because JoinProbe2 will
+ * produce more data.
+ *
+ * Assume both JoinBuild2 outputs 0 rows this pipeline task should not be
blocked by ExchangeSource
+ * and Sink because JoinProbe2 will always produce 0 rows and terminate
early.
+ *
+ * In a nutshell, we should follow the rules:
+ * 1. if any operator in pipeline can terminate early, this task should
never be blocked by source operator.
+ * 2. if the last operator (except sink) can terminate early, this task
should never be blocked by sink operator.
+ */
+ bool _always_can_read = false;
+ bool _always_can_write = false;
};
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index c8f73cbaa3..10a6518ff1 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -320,11 +320,9 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
// TODO pipeline 1 need to add new interface for exec node and operator
sink->init(request.fragment.output_sink);
- Operators operators;
- RETURN_IF_ERROR(pipeline->build_operators(operators));
- auto task =
- std::make_unique<PipelineTask>(pipeline, _total_tasks++,
_runtime_state.get(),
- operators, sink, this,
pipeline->pipeline_profile());
+ RETURN_IF_ERROR(pipeline->build_operators());
+ auto task = std::make_unique<PipelineTask>(pipeline, _total_tasks++,
_runtime_state.get(),
+ sink, this,
pipeline->pipeline_profile());
sink->set_child(task->get_root());
_tasks.emplace_back(std::move(task));
_runtime_profile->add_child(pipeline->pipeline_profile(), true,
nullptr);
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 3c9f82f987..035db53a27 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -41,8 +41,7 @@ class RuntimeState;
namespace doris::pipeline {
PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t index,
RuntimeState* state,
- Operators& operators, OperatorPtr& sink,
- PipelineFragmentContext* fragment_context,
+ OperatorPtr& sink, PipelineFragmentContext*
fragment_context,
RuntimeProfile* parent_profile)
: _index(index),
_pipeline(pipeline),
@@ -53,7 +52,7 @@ PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t
index, RuntimeState*
_data_state(SourceState::DEPEND_ON_SOURCE),
_fragment_context(fragment_context),
_parent_profile(parent_profile),
- _operators(operators),
+ _operators(pipeline->_operators),
_source(_operators.front()),
_root(_operators.back()),
_sink(sink) {
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 8b407bbaeb..b7b730ad82 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -109,9 +109,8 @@ class PriorityTaskQueue;
// The class do the pipeline task. Minest schdule union by task scheduler
class PipelineTask {
public:
- PipelineTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state,
Operators& operators,
- OperatorPtr& sink, PipelineFragmentContext* fragment_context,
- RuntimeProfile* parent_profile);
+ PipelineTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state,
OperatorPtr& sink,
+ PipelineFragmentContext* fragment_context, RuntimeProfile*
parent_profile);
PipelineTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state,
PipelineFragmentContext* fragment_context, RuntimeProfile*
parent_profile);
@@ -155,13 +154,13 @@ public:
return false;
}
- virtual bool source_can_read() { return _source->can_read() ||
_ignore_blocking_source(); }
+ virtual bool source_can_read() { return _source->can_read() ||
_pipeline->_always_can_read; }
virtual bool runtime_filters_are_ready_or_timeout() {
return _source->runtime_filters_are_ready_or_timeout();
}
- virtual bool sink_can_write() { return _sink->can_write() ||
_ignore_blocking_sink(); }
+ virtual bool sink_can_write() { return _sink->can_write() ||
_pipeline->_always_can_write; }
virtual Status finalize();
@@ -251,7 +250,7 @@ public:
protected:
void _finish_p_dependency() {
for (const auto& p : _pipeline->_parents) {
- p.lock()->finish_one_dependency(_previous_schedule_id);
+ p.second.lock()->finish_one_dependency(p.first,
_previous_schedule_id);
}
}
@@ -348,41 +347,6 @@ protected:
RuntimeProfile::Counter* _pip_task_total_timer;
private:
- /**
- * Consider the query plan below:
- *
- * ExchangeSource JoinBuild1
- * \ /
- * JoinProbe1 (Right Outer) JoinBuild2
- * \ /
- * JoinProbe2 (Right Outer)
- * |
- * Sink
- *
- * Assume JoinBuild1/JoinBuild2 outputs 0 rows, this pipeline task should
not be blocked by ExchangeSource
- * because we have a determined conclusion that JoinProbe1/JoinProbe2 will
also output 0 rows.
- *
- * Assume JoinBuild2 outputs > 0 rows, this pipeline task may be blocked
by Sink because JoinProbe2 will
- * produce more data.
- *
- * Assume both JoinBuild2 outputs 0 rows this pipeline task should not be
blocked by ExchangeSource
- * and Sink because JoinProbe2 will always produce 0 rows and terminate
early.
- *
- * In a nutshell, we should follow the rules:
- * 1. if any operator in pipeline can terminate early, this task should
never be blocked by source operator.
- * 2. if the last operator (except sink) can terminate early, this task
should never be blocked by sink operator.
- */
- [[nodiscard]] bool _ignore_blocking_sink() { return
_root->can_terminate_early(); }
-
- [[nodiscard]] bool _ignore_blocking_source() {
- for (size_t i = 1; i < _operators.size(); i++) {
- if (_operators[i]->can_terminate_early()) {
- return true;
- }
- }
- return false;
- }
-
Operators _operators; // left is _source, right is _root
OperatorPtr _source;
OperatorPtr _root;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 864709b4ed..9dcd18b469 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -67,14 +67,16 @@ public:
Status close() override;
bool source_can_read() override {
- return _source->can_read(_state) || _ignore_blocking_source();
+ return _source->can_read(_state) || _pipeline->_always_can_read;
}
bool runtime_filters_are_ready_or_timeout() override {
return _source->runtime_filters_are_ready_or_timeout();
}
- bool sink_can_write() override { return _sink->can_write(_state) ||
_ignore_blocking_sink(); }
+ bool sink_can_write() override {
+ return _sink->can_write(_state) || _pipeline->_always_can_write;
+ }
Status finalize() override;
@@ -105,17 +107,6 @@ public:
}
private:
- [[nodiscard]] bool _ignore_blocking_sink() { return
_root->can_terminate_early(_state); }
-
- [[nodiscard]] bool _ignore_blocking_source() {
- for (size_t i = 1; i < _operators.size(); i++) {
- if (_operators[i]->can_terminate_early(_state)) {
- return true;
- }
- }
- return false;
- }
-
using DependencyMap = std::map<int, DependencySPtr>;
Status _open() override;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]