This is an automated email from the ASF dual-hosted git repository.
zhangstar333 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d2a99aa03bb [refactor](scan) change scan reschedule into scan context
(#27766)
d2a99aa03bb is described below
commit d2a99aa03bb0b68f46c7afa87fbc0dd530be97fa
Author: zhangstar333 <[email protected]>
AuthorDate: Mon Dec 4 10:25:52 2023 +0800
[refactor](scan) change scan reschedule into scan context (#27766)
* [refactor](scan) change scan reschedule into scan context
---
be/src/pipeline/exec/scan_operator.cpp | 4 ----
be/src/pipeline/exec/scan_operator.h | 11 -----------
be/src/vec/exec/scan/pip_scanner_context.h | 8 ++++++--
3 files changed, 6 insertions(+), 17 deletions(-)
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index bb5a76e19b5..25afa697107 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -59,10 +59,6 @@ bool ScanOperator::can_read() {
// _scanner_ctx->no_schedule(): should schedule _scanner_ctx
return true;
} else {
- if (_node->_scanner_ctx->get_num_running_scanners() == 0 &&
- _node->_scanner_ctx->should_be_scheduled()) {
- _node->_scanner_ctx->reschedule_scanner_ctx();
- }
return _node->ready_to_read(); // there are some blocks to process
}
}
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index 06fe2452c78..ebc1317b933 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -63,17 +63,6 @@ public:
ScanDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "ScanDependency", query_ctx),
_scanner_ctx(nullptr) {}
- // TODO(gabriel):
- [[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override {
- if (_scanner_ctx && _scanner_ctx->get_num_running_scanners() == 0 &&
- _scanner_ctx->should_be_scheduled()) {
- _scanner_ctx->reschedule_scanner_ctx();
- }
- return Dependency::is_blocked_by(task);
- }
-
- bool push_to_blocking_queue() const override { return true; }
-
void block() override {
if (_scanner_done) {
return;
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h
b/be/src/vec/exec/scan/pip_scanner_context.h
index 6c1f8e6325c..8e4ab5c22bf 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -76,11 +76,15 @@ public:
*block = std::move(_blocks_queues[id].front());
_blocks_queues[id].pop_front();
- if (_blocks_queues[id].empty() && _dependency) {
- _dependency->block();
+ if (_blocks_queues[id].empty()) {
+ this->reschedule_scanner_ctx();
+ if (_dependency) {
+ _dependency->block();
+ }
}
}
_current_used_bytes -= (*block)->allocated_bytes();
+
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]