This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 96164f3bdc [pipelinex](sort) Fix expression initialization order
(#23405)
96164f3bdc is described below
commit 96164f3bdcac9dc8dfebfe318de6c3b2f94f6480
Author: Gabriel <[email protected]>
AuthorDate: Thu Aug 24 17:29:24 2023 +0800
[pipelinex](sort) Fix expression initialization order (#23405)
---
be/src/pipeline/exec/operator.h | 4 ++
be/src/pipeline/exec/sort_sink_operator.cpp | 3 +-
be/src/pipeline/pipeline.cpp | 4 +-
be/src/pipeline/pipeline_task.h | 74 ++++++++++++++--------------
be/src/pipeline/pipeline_x/pipeline_x_task.h | 17 ++++++-
5 files changed, 60 insertions(+), 42 deletions(-)
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index c6e41982a6..a03297bcf2 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -603,6 +603,10 @@ public:
Status finalize(RuntimeState* state) override { return Status::OK(); }
+ [[nodiscard]] bool can_terminate_early() override { return false; }
+
+ [[nodiscard]] virtual bool can_terminate_early(RuntimeState* state) {
return false; }
+
bool can_read() override {
LOG(FATAL) << "should not reach here!";
return false;
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp
b/be/src/pipeline/exec/sort_sink_operator.cpp
index 12f0d8a753..182ce7bd71 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -33,6 +33,7 @@ Status SortSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info) {
_dependency = (SortDependency*)info.dependency;
_shared_state = (SortSharedState*)_dependency->shared_state();
+ RETURN_IF_ERROR(p._vsort_exec_exprs.clone(state, _vsort_exec_exprs));
_profile = p._pool->add(new RuntimeProfile("SortSinkLocalState"));
switch (p._algorithm) {
case SortAlgorithm::HEAP_SORT: {
@@ -70,7 +71,7 @@ Status SortSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info) {
_child_get_next_timer = ADD_TIMER(_profile, "ChildGetResultTime");
_sink_timer = ADD_TIMER(_profile, "PartialSortTotalTime");
- return p._vsort_exec_exprs.clone(state, _vsort_exec_exprs);
+ return Status::OK();
}
SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode,
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 1d1dd627ec..69eaba3fbb 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -63,8 +63,8 @@ Status Pipeline::add_operator(OperatorXPtr& op) {
Status Pipeline::prepare(RuntimeState* state) {
// TODO
- RETURN_IF_ERROR(_operators[_operators.size() - 1]->prepare(state));
- RETURN_IF_ERROR(_operators[_operators.size() - 1]->open(state));
+ RETURN_IF_ERROR(_operators.back()->prepare(state));
+ RETURN_IF_ERROR(_operators.back()->open(state));
RETURN_IF_ERROR(_sink_x->prepare(state));
RETURN_IF_ERROR(_sink_x->open(state));
return Status::OK();
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index dea3cd27ff..57d7659197 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -155,48 +155,13 @@ public:
return false;
}
- virtual bool source_can_read() { return _source->can_read() ||
ignore_blocking_source(); }
+ virtual bool source_can_read() { return _source->can_read() ||
_ignore_blocking_source(); }
virtual bool runtime_filters_are_ready_or_timeout() {
return _source->runtime_filters_are_ready_or_timeout();
}
- /**
- * Consider the query plan below:
- *
- * ExchangeSource JoinBuild1
- * \ /
- * JoinProbe1 (Right Outer) JoinBuild2
- * \ /
- * JoinProbe2 (Right Outer)
- * |
- * Sink
- *
- * Assume JoinBuild1/JoinBuild2 outputs 0 rows, this pipeline task should
not be blocked by ExchangeSource
- * because we have a determined conclusion that JoinProbe1/JoinProbe2 will
also output 0 rows.
- *
- * Assume JoinBuild2 outputs > 0 rows, this pipeline task may be blocked
by Sink because JoinProbe2 will
- * produce more data.
- *
- * Assume both JoinBuild2 outputs 0 rows this pipeline task should not be
blocked by ExchangeSource
- * and Sink because JoinProbe2 will always produce 0 rows and terminate
early.
- *
- * In a nutshell, we should follow the rules:
- * 1. if any operator in pipeline can terminate early, this task should
never be blocked by source operator.
- * 2. if the last operator (except sink) can terminate early, this task
should never be blocked by sink operator.
- */
- [[nodiscard]] virtual bool ignore_blocking_sink() { return
_root->can_terminate_early(); }
-
- [[nodiscard]] virtual bool ignore_blocking_source() {
- for (size_t i = 1; i < _operators.size(); i++) {
- if (_operators[i]->can_terminate_early()) {
- return true;
- }
- }
- return false;
- }
-
- virtual bool sink_can_write() { return _sink->can_write() ||
ignore_blocking_sink(); }
+ virtual bool sink_can_write() { return _sink->can_write() ||
_ignore_blocking_sink(); }
virtual Status finalize();
@@ -381,6 +346,41 @@ protected:
RuntimeProfile::Counter* _pip_task_total_timer;
private:
+ /**
+ * Consider the query plan below:
+ *
+ * ExchangeSource JoinBuild1
+ * \ /
+ * JoinProbe1 (Right Outer) JoinBuild2
+ * \ /
+ * JoinProbe2 (Right Outer)
+ * |
+ * Sink
+ *
+ * Assume JoinBuild1/JoinBuild2 outputs 0 rows, this pipeline task should
not be blocked by ExchangeSource
+ * because we have a determined conclusion that JoinProbe1/JoinProbe2 will
also output 0 rows.
+ *
+ * Assume JoinBuild2 outputs > 0 rows, this pipeline task may be blocked
by Sink because JoinProbe2 will
+ * produce more data.
+ *
+ * Assume both JoinBuild2 outputs 0 rows this pipeline task should not be
blocked by ExchangeSource
+ * and Sink because JoinProbe2 will always produce 0 rows and terminate
early.
+ *
+ * In a nutshell, we should follow the rules:
+ * 1. if any operator in pipeline can terminate early, this task should
never be blocked by source operator.
+ * 2. if the last operator (except sink) can terminate early, this task
should never be blocked by sink operator.
+ */
+ [[nodiscard]] bool _ignore_blocking_sink() { return
_root->can_terminate_early(); }
+
+ [[nodiscard]] bool _ignore_blocking_source() {
+ for (size_t i = 1; i < _operators.size(); i++) {
+ if (_operators[i]->can_terminate_early()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
Operators _operators; // left is _source, right is _root
OperatorPtr _source;
OperatorPtr _root;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 74688fdc90..1453b10ba2 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -63,13 +63,15 @@ public:
// must be call after all pipeline task is finish to release resource
Status close() override;
- bool source_can_read() override { return _source->can_read(_state); }
+ bool source_can_read() override {
+ return _source->can_read(_state) || _ignore_blocking_source();
+ }
bool runtime_filters_are_ready_or_timeout() override {
return _source->runtime_filters_are_ready_or_timeout();
}
- bool sink_can_write() override { return _sink->can_write(_state); }
+ bool sink_can_write() override { return _sink->can_write(_state) ||
_ignore_blocking_sink(); }
Status finalize() override;
@@ -100,6 +102,17 @@ public:
}
private:
+ [[nodiscard]] bool _ignore_blocking_sink() { return
_root->can_terminate_early(_state); }
+
+ [[nodiscard]] bool _ignore_blocking_source() {
+ for (size_t i = 1; i < _operators.size(); i++) {
+ if (_operators[i]->can_terminate_early(_state)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
using DependencyMap = std::map<int, DependencySPtr>;
Status _open() override;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]