This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 459f75073fb [pipelineX](dependency) remove OrDependency (#27242)
459f75073fb is described below
commit 459f75073fba512232769362a4172f12a12923d9
Author: Gabriel <[email protected]>
AuthorDate: Mon Nov 20 13:05:34 2023 +0800
[pipelineX](dependency) remove OrDependency (#27242)
---
be/src/pipeline/exec/es_scan_operator.cpp | 4 +-
be/src/pipeline/exec/file_scan_operator.cpp | 4 +-
be/src/pipeline/exec/meta_scan_operator.cpp | 2 +-
be/src/pipeline/exec/olap_scan_operator.cpp | 6 +--
be/src/pipeline/exec/scan_operator.cpp | 47 ++++++---------------
be/src/pipeline/exec/scan_operator.h | 63 +++++++++++++++++-----------
be/src/pipeline/pipeline_x/dependency.cpp | 33 ---------------
be/src/pipeline/pipeline_x/dependency.h | 35 ++--------------
be/src/pipeline/pipeline_x/pipeline_x_task.h | 4 --
be/src/vec/exec/scan/pip_scanner_context.h | 26 ++++--------
be/src/vec/exec/scan/scanner_context.cpp | 4 +-
be/src/vec/exec/scan/scanner_context.h | 14 +++----
12 files changed, 80 insertions(+), 162 deletions(-)
diff --git a/be/src/pipeline/exec/es_scan_operator.cpp
b/be/src/pipeline/exec/es_scan_operator.cpp
index 9b41155a22b..b9112917954 100644
--- a/be/src/pipeline/exec/es_scan_operator.cpp
+++ b/be/src/pipeline/exec/es_scan_operator.cpp
@@ -55,7 +55,7 @@ Status EsScanLocalState::_init_profile() {
Status EsScanLocalState::_process_conjuncts() {
RETURN_IF_ERROR(Base::_process_conjuncts());
- if (Base::_eos_dependency->read_blocked_by() == nullptr) {
+ if (Base::_scan_dependency->eos()) {
return Status::OK();
}
@@ -66,7 +66,7 @@ Status EsScanLocalState::_process_conjuncts() {
Status EsScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>*
scanners) {
if (_scan_ranges.empty()) {
- Base::_eos_dependency->set_ready_for_read();
+ Base::_scan_dependency->set_eos();
return Status::OK();
}
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp
b/be/src/pipeline/exec/file_scan_operator.cpp
index 019f5813a42..369ad607c6f 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -32,7 +32,7 @@ namespace doris::pipeline {
Status FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>*
scanners) {
if (_scan_ranges.empty()) {
- Base::_eos_dependency->set_ready_for_read();
+ Base::_scan_dependency->set_eos();
return Status::OK();
}
@@ -95,7 +95,7 @@ Status FileScanLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
Status FileScanLocalState::_process_conjuncts() {
RETURN_IF_ERROR(ScanLocalState<FileScanLocalState>::_process_conjuncts());
- if (Base::_eos_dependency->read_blocked_by() == nullptr) {
+ if (Base::_scan_dependency->eos()) {
return Status::OK();
}
// TODO: Push conjuncts down to reader.
diff --git a/be/src/pipeline/exec/meta_scan_operator.cpp
b/be/src/pipeline/exec/meta_scan_operator.cpp
index c2f9bc7e428..2de19bb2ced 100644
--- a/be/src/pipeline/exec/meta_scan_operator.cpp
+++ b/be/src/pipeline/exec/meta_scan_operator.cpp
@@ -22,7 +22,7 @@
namespace doris::pipeline {
Status MetaScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>*
scanners) {
- if (Base::_eos_dependency->read_blocked_by() == nullptr) {
+ if (Base::_scan_dependency->eos()) {
return Status::OK();
}
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp
b/be/src/pipeline/exec/olap_scan_operator.cpp
index a77dba4c00a..acda79bdfb3 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -133,7 +133,7 @@ Status OlapScanLocalState::_init_profile() {
Status OlapScanLocalState::_process_conjuncts() {
SCOPED_TIMER(_process_conjunct_timer);
RETURN_IF_ERROR(ScanLocalState::_process_conjuncts());
- if (ScanLocalState::_eos_dependency->read_blocked_by() == nullptr) {
+ if (ScanLocalState::_scan_dependency->eos()) {
return Status::OK();
}
RETURN_IF_ERROR(_build_key_ranges_and_filters());
@@ -213,7 +213,7 @@ bool OlapScanLocalState::_storage_no_merge() {
Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>*
scanners) {
if (_scan_ranges.empty()) {
- ScanLocalState::_eos_dependency->set_ready_for_read();
+ ScanLocalState::_scan_dependency->set_eos();
return Status::OK();
}
SCOPED_TIMER(_scanner_init_timer);
@@ -408,7 +408,7 @@ Status OlapScanLocalState::_build_key_ranges_and_filters() {
iter->second));
}
if (eos) {
- ScanLocalState::_eos_dependency->set_ready_for_read();
+ ScanLocalState::_scan_dependency->set_eos();
}
for (auto& iter : _colname_to_value_range) {
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index d2953d3593b..300ebea995b 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -122,12 +122,9 @@ Status ScanLocalState<Derived>::init(RuntimeState* state,
LocalStateInfo& info)
SCOPED_TIMER(_open_timer);
RETURN_IF_ERROR(RuntimeFilterConsumer::init(state));
- _source_dependency =
OrDependency::create_shared(PipelineXLocalState<>::_parent->operator_id(),
+ _scan_dependency =
ScanDependency::create_shared(PipelineXLocalState<>::_parent->operator_id(),
PipelineXLocalState<>::_parent->node_id());
- _eos_dependency =
EosDependency::create_shared(PipelineXLocalState<>::_parent->operator_id(),
-
PipelineXLocalState<>::_parent->node_id());
- _source_dependency->add_child(_eos_dependency);
auto& p = _parent->cast<typename Derived::Parent>();
set_scan_ranges(state, info.scan_ranges);
_common_expr_ctxs_push_down.resize(p._common_expr_ctxs_push_down.size());
@@ -174,11 +171,10 @@ Status ScanLocalState<Derived>::open(RuntimeState* state)
{
RETURN_IF_ERROR(_acquire_runtime_filter());
RETURN_IF_ERROR(_process_conjuncts());
- auto status =
- _eos_dependency->read_blocked_by() == nullptr ? Status::OK() :
_prepare_scanners();
+ auto status = _scan_dependency->eos() ? Status::OK() : _prepare_scanners();
if (_scanner_ctx) {
_finish_dependency->should_finish_after_check();
- DCHECK(_eos_dependency->read_blocked_by() != nullptr &&
_num_scanners->value() > 0);
+ DCHECK(!_scan_dependency->eos() && _num_scanners->value() > 0);
RETURN_IF_ERROR(_scanner_ctx->init());
RETURN_IF_ERROR(state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get()));
}
@@ -266,7 +262,7 @@ Status ScanLocalState<Derived>::_normalize_conjuncts() {
std::visit(
[&](auto&& range) {
if (range.is_empty_value_range()) {
- _eos_dependency->set_ready_for_read();
+ _scan_dependency->set_eos();
}
},
it.second.second);
@@ -559,7 +555,7 @@ Status
ScanLocalState<Derived>::_eval_const_conjuncts(vectorized::VExpr* vexpr,
constant_val =
const_cast<char*>(const_column->get_data_at(0).data);
if (constant_val == nullptr ||
!*reinterpret_cast<bool*>(constant_val)) {
*pdt = vectorized::VScanNode::PushDownType::ACCEPTABLE;
- _eos_dependency->set_ready_for_read();
+ _scan_dependency->set_eos();
}
} else if (const vectorized::ColumnVector<vectorized::UInt8>*
bool_column =
check_and_get_column<vectorized::ColumnVector<vectorized::UInt8>>(
@@ -576,7 +572,7 @@ Status
ScanLocalState<Derived>::_eval_const_conjuncts(vectorized::VExpr* vexpr,
constant_val =
const_cast<char*>(bool_column->get_data_at(0).data);
if (constant_val == nullptr ||
!*reinterpret_cast<bool*>(constant_val)) {
*pdt = vectorized::VScanNode::PushDownType::ACCEPTABLE;
- _eos_dependency->set_ready_for_read();
+ _scan_dependency->set_eos();
}
} else {
LOG(WARNING) << "Constant predicate in scan node should return
a bool column with "
@@ -773,7 +769,7 @@ Status
ScanLocalState<Derived>::_normalize_not_in_and_not_eq_predicate(
HybridSetBase::IteratorBase* iter = state->hybrid_set->begin();
auto fn_name = std::string("");
if (!is_fixed_range && state->null_in_set) {
- _eos_dependency->set_ready_for_read();
+ _scan_dependency->set_eos();
}
while (iter->has_next()) {
// column not in (nullptr) is always true
@@ -1166,7 +1162,7 @@ Status ScanLocalState<Derived>::_prepare_scanners() {
std::list<vectorized::VScannerSPtr> scanners;
RETURN_IF_ERROR(_init_scanners(&scanners));
if (scanners.empty()) {
- _eos_dependency->set_ready_for_read();
+ _scan_dependency->set_eos();
} else {
COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
RETURN_IF_ERROR(_start_scanners(scanners));
@@ -1181,14 +1177,8 @@ Status ScanLocalState<Derived>::_start_scanners(
_scanner_ctx = PipScannerContext::create_shared(state(), this,
p._output_tuple_desc, scanners,
p.limit(),
state()->scan_queue_mem_limit(),
p._col_distribute_ids, 1);
- _scanner_done_dependency =
ScannerDoneDependency::create_shared(p.operator_id(), p.node_id());
- _source_dependency->add_child(_scanner_done_dependency);
- _data_ready_dependency =
- DataReadyDependency::create_shared(p.operator_id(), p.node_id(),
_scanner_ctx.get());
- _source_dependency->add_child(_data_ready_dependency);
-
- _scanner_ctx->set_dependency(_data_ready_dependency,
_scanner_done_dependency,
- _finish_dependency);
+ _scan_dependency->set_scanner_ctx(_scanner_ctx.get());
+ _scanner_ctx->set_dependency(_scan_dependency, _finish_dependency);
return Status::OK();
}
@@ -1340,23 +1330,12 @@ Status ScanLocalState<Derived>::close(RuntimeState*
state) {
return Status::OK();
}
SCOPED_TIMER(_close_timer);
- if (_data_ready_dependency) {
- COUNTER_UPDATE(_wait_for_data_timer,
_data_ready_dependency->read_watcher_elapse_time());
- COUNTER_UPDATE(exec_time_counter(),
_data_ready_dependency->read_watcher_elapse_time());
- }
- if (_eos_dependency) {
- COUNTER_SET(_wait_for_eos_timer,
_eos_dependency->read_watcher_elapse_time());
- COUNTER_UPDATE(exec_time_counter(),
_eos_dependency->read_watcher_elapse_time());
- }
- if (_scanner_done_dependency) {
- COUNTER_SET(_wait_for_scanner_done_timer,
- _scanner_done_dependency->read_watcher_elapse_time());
- COUNTER_UPDATE(exec_time_counter(),
_scanner_done_dependency->read_watcher_elapse_time());
- }
+
SCOPED_TIMER(exec_time_counter());
if (_scanner_ctx.get()) {
_scanner_ctx->clear_and_join(reinterpret_cast<ScanLocalStateBase*>(this),
state);
}
+ COUNTER_SET(_wait_for_dependency_timer,
_scan_dependency->read_watcher_elapse_time());
return PipelineXLocalState<>::close(state);
}
@@ -1391,7 +1370,7 @@ Status
ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized:
}
}
- if (local_state._eos_dependency->read_blocked_by() == nullptr) {
+ if (local_state._scan_dependency->eos()) {
source_state = SourceState::FINISHED;
return Status::OK();
}
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index 66543dc7ffd..f058225580d 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -56,38 +56,56 @@ public:
Status try_close(RuntimeState* state) override;
};
-class EosDependency final : public Dependency {
+class ScanDependency final : public Dependency {
public:
- ENABLE_FACTORY_CREATOR(EosDependency);
- EosDependency(int id, int node_id) : Dependency(id, node_id,
"EosDependency") {}
- void* shared_state() override { return nullptr; }
-};
-
-class ScannerDoneDependency final : public Dependency {
-public:
- ENABLE_FACTORY_CREATOR(ScannerDoneDependency);
- ScannerDoneDependency(int id, int node_id) : Dependency(id, node_id,
"ScannerDoneDependency") {}
- void* shared_state() override { return nullptr; }
-};
-
-class DataReadyDependency final : public Dependency {
-public:
- ENABLE_FACTORY_CREATOR(DataReadyDependency);
- DataReadyDependency(int id, int node_id, vectorized::ScannerContext*
scanner_ctx)
- : Dependency(id, node_id, "DataReadyDependency"),
_scanner_ctx(scanner_ctx) {}
+ ENABLE_FACTORY_CREATOR(ScanDependency);
+ ScanDependency(int id, int node_id)
+ : Dependency(id, node_id, "ScanDependency"), _scanner_ctx(nullptr)
{}
void* shared_state() override { return nullptr; }
// TODO(gabriel):
[[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override {
- if (_scanner_ctx->get_num_running_scanners() == 0 &&
_scanner_ctx->should_be_scheduled()) {
+ if (_scanner_ctx && _scanner_ctx->get_num_running_scanners() == 0 &&
+ _scanner_ctx->should_be_scheduled()) {
_scanner_ctx->reschedule_scanner_ctx();
}
return Dependency::read_blocked_by(task);
}
+ void block_reading() override {
+ if (_eos) {
+ return;
+ }
+ if (_scanner_done) {
+ return;
+ }
+ Dependency::block_reading();
+ }
+
+ bool eos() const { return _eos.load(); }
+ void set_eos() {
+ if (_eos) {
+ return;
+ }
+ _eos = true;
+ Dependency::set_ready_for_read();
+ }
+
+ void set_scanner_done() {
+ if (_scanner_done) {
+ return;
+ }
+ _scanner_done = true;
+ Dependency::set_ready_for_read();
+ }
+
+ void set_scanner_ctx(vectorized::ScannerContext* scanner_ctx) {
_scanner_ctx = scanner_ctx; }
+
private:
vectorized::ScannerContext* _scanner_ctx;
+ std::atomic<bool> _eos {false};
+ std::atomic<bool> _scanner_done {false};
};
class ScanLocalStateBase : public PipelineXLocalState<>, public
vectorized::RuntimeFilterConsumer {
@@ -128,10 +146,7 @@ protected:
virtual Status _init_profile() = 0;
std::atomic<bool> _opened {false};
- std::shared_ptr<EosDependency> _eos_dependency;
- std::shared_ptr<OrDependency> _source_dependency;
- std::shared_ptr<ScannerDoneDependency> _scanner_done_dependency;
- std::shared_ptr<DataReadyDependency> _data_ready_dependency;
+ std::shared_ptr<ScanDependency> _scan_dependency;
std::shared_ptr<RuntimeProfile> _scanner_profile;
RuntimeProfile::Counter* _scanner_sched_counter = nullptr;
@@ -203,7 +218,7 @@ class ScanLocalState : public ScanLocalStateBase {
int64_t get_push_down_count() override;
- Dependency* dependency() override { return _source_dependency.get(); }
+ Dependency* dependency() override { return _scan_dependency.get(); }
protected:
template <typename LocalStateType>
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp
b/be/src/pipeline/pipeline_x/dependency.cpp
index a06fadef03b..2e43007dee7 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -170,28 +170,6 @@ WriteDependency*
WriteDependency::write_blocked_by(PipelineXTask* task) {
return ready_for_write ? nullptr : this;
}
-Dependency* OrDependency::read_blocked_by(PipelineXTask* task) {
- // TODO(gabriel):
- for (auto& child : _children) {
- auto* cur_res = child->read_blocked_by(nullptr);
- if (cur_res == nullptr) {
- return nullptr;
- }
- }
- return this;
-}
-
-WriteDependency* OrDependency::write_blocked_by(PipelineXTask* task) {
- for (auto& child : _children) {
- CHECK(child->is_write_dependency());
- auto* cur_res =
((WriteDependency*)child.get())->write_blocked_by(nullptr);
- if (cur_res == nullptr) {
- return nullptr;
- }
- }
- return this;
-}
-
template Status HashJoinDependency::extract_join_column<true>(
vectorized::Block&,
COW<vectorized::IColumn>::mutable_ptr<vectorized::ColumnVector<unsigned char>>&,
@@ -250,17 +228,6 @@ std::string AndDependency::debug_string(int
indentation_level) {
return fmt::to_string(debug_string_buffer);
}
-std::string OrDependency::debug_string(int indentation_level) {
- fmt::memory_buffer debug_string_buffer;
- fmt::format_to(debug_string_buffer, "{}{}: id={}, children=[",
- std::string(indentation_level * 2, ' '), _name, _node_id);
- for (auto& child : _children) {
- fmt::format_to(debug_string_buffer, "{}, \n",
child->debug_string(indentation_level = 1));
- }
- fmt::format_to(debug_string_buffer, "{}]", std::string(indentation_level *
2, ' '));
- return fmt::to_string(debug_string_buffer);
-}
-
Status AggDependency::reset_hash_table() {
return std::visit(
[&](auto&& agg_method) {
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index 11a8975b306..f6a37766525 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -55,7 +55,6 @@ public:
: _id(id), _node_id(node_id), _name(std::move(name)),
_ready_for_read(false) {}
virtual ~Dependency() = default;
- virtual bool is_or_dep() { return false; }
[[nodiscard]] int id() const { return _id; }
[[nodiscard]] virtual std::string name() const { return _name; }
virtual void* shared_state() = 0;
@@ -280,37 +279,6 @@ public:
}
};
-class OrDependency final : public WriteDependency {
-public:
- ENABLE_FACTORY_CREATOR(OrDependency);
- OrDependency(int id, int node_id) : WriteDependency(id, node_id,
"OrDependency") {}
-
- [[nodiscard]] std::string name() const override {
- fmt::memory_buffer debug_string_buffer;
- fmt::format_to(debug_string_buffer, "{}[", _name);
- for (auto& child : _children) {
- fmt::format_to(debug_string_buffer, "{}, ", child->name());
- }
- fmt::format_to(debug_string_buffer, "]");
- return fmt::to_string(debug_string_buffer);
- }
-
- void* shared_state() override { return nullptr; }
-
- std::string debug_string(int indentation_level = 0) override;
-
- bool is_or_dep() override { return true; }
-
- [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override;
-
- [[nodiscard]] WriteDependency* write_blocked_by(PipelineXTask* task)
override;
-
- void add_child(std::shared_ptr<Dependency> child) override {
- WriteDependency::add_child(child);
- child->set_parent(weak_from_this());
- }
-};
-
struct FakeSharedState {};
struct FakeDependency final : public WriteDependency {
public:
@@ -681,6 +649,9 @@ public:
}
void set_eos() {
+ if (_eos) {
+ return;
+ }
_eos = true;
WriteDependency::set_ready_for_read();
}
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index bc50f1e89de..04c5ddc1974 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -136,10 +136,6 @@ public:
return _use_blocking_queue || get_state() ==
PipelineTaskState::BLOCKED_FOR_DEPENDENCY;
}
void set_use_blocking_queue(bool use_blocking_queue) {
- if (_blocked_dep->is_or_dep()) {
- _use_blocking_queue = true;
- return;
- }
_use_blocking_queue = use_blocking_queue;
}
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h
b/be/src/vec/exec/scan/pip_scanner_context.h
index f02e07a6f86..fe00a9489aa 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -50,14 +50,6 @@ public:
_col_distribute_ids(col_distribute_ids),
_need_colocate_distribute(!_col_distribute_ids.empty()) {}
- void set_dependency(std::shared_ptr<DataReadyDependency> dependency,
- std::shared_ptr<ScannerDoneDependency>
scanner_done_dependency,
- std::shared_ptr<FinishDependency> finish_dependency)
override {
- _data_dependency = dependency;
- _scanner_done_dependency = scanner_done_dependency;
- _finish_dependency = finish_dependency;
- }
-
Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr*
block, bool* eos,
int id, bool wait = false) override {
{
@@ -84,8 +76,8 @@ public:
*block = std::move(_blocks_queues[id].front());
_blocks_queues[id].pop_front();
- if (_blocks_queues[id].empty() && _data_dependency) {
- _data_dependency->block_reading();
+ if (_blocks_queues[id].empty() && _dependency) {
+ _dependency->block_reading();
}
}
_current_used_bytes -= (*block)->allocated_bytes();
@@ -157,8 +149,8 @@ public:
for (int j = i; j < block_size; j += queue_size) {
_blocks_queues[queue].emplace_back(std::move(blocks[j]));
}
- if (_data_dependency) {
- _data_dependency->set_ready_for_read();
+ if (_dependency) {
+ _dependency->set_ready_for_read();
}
}
_next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0;
@@ -209,8 +201,8 @@ public:
_blocks_queues[i].emplace_back(std::move(_colocate_blocks[i]));
_colocate_mutable_blocks[i]->clear();
}
- if (_data_dependency) {
- _data_dependency->set_ready_for_read();
+ if (_dependency) {
+ _dependency->set_ready_for_read();
}
}
}
@@ -237,8 +229,6 @@ private:
std::vector<std::unique_ptr<vectorized::MutableBlock>>
_colocate_mutable_blocks;
std::vector<std::unique_ptr<std::mutex>> _colocate_block_mutexs;
- std::shared_ptr<DataReadyDependency> _data_dependency = nullptr;
-
void _add_rows_colocate_blocks(vectorized::Block* block, int loc,
const std::vector<int>& rows) {
int row_wait_add = rows.size();
@@ -265,8 +255,8 @@ private:
std::lock_guard<std::mutex> queue_l(*_queue_mutexs[loc]);
_blocks_queues[loc].emplace_back(std::move(_colocate_blocks[loc]));
}
- if (_data_dependency) {
- _data_dependency->set_ready_for_read();
+ if (_dependency) {
+ _dependency->set_ready_for_read();
}
_colocate_blocks[loc] = get_free_block();
_colocate_mutable_blocks[loc]->set_muatable_columns(
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index ef6a9d415de..b2c481871ae 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -430,8 +430,8 @@ bool ScannerContext::no_schedule() {
}
void ScannerContext::_set_scanner_done() {
- if (_scanner_done_dependency) {
- _scanner_done_dependency->set_ready_for_read();
+ if (_dependency) {
+ _dependency->set_scanner_done();
}
}
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index 10b4775ceff..a0702960ac1 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -43,9 +43,8 @@ class TupleDescriptor;
namespace pipeline {
class ScanLocalStateBase;
-class ScannerDoneDependency;
+class ScanDependency;
class FinishDependency;
-class DataReadyDependency;
} // namespace pipeline
namespace taskgroup {
@@ -106,10 +105,11 @@ public:
return _process_status;
}
- virtual void set_dependency(
- std::shared_ptr<pipeline::DataReadyDependency> dependency,
- std::shared_ptr<pipeline::ScannerDoneDependency>
scanner_done_dependency,
- std::shared_ptr<pipeline::FinishDependency> finish_dependency) {}
+ void set_dependency(std::shared_ptr<pipeline::ScanDependency> dependency,
+ std::shared_ptr<pipeline::FinishDependency>
finish_dependency) {
+ _dependency = dependency;
+ _finish_dependency = finish_dependency;
+ }
// Called by ScanNode.
// Used to notify the scheduler that this ScannerContext can stop working.
@@ -283,7 +283,7 @@ protected:
RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr;
- std::shared_ptr<pipeline::ScannerDoneDependency> _scanner_done_dependency
= nullptr;
+ std::shared_ptr<pipeline::ScanDependency> _dependency = nullptr;
std::shared_ptr<pipeline::FinishDependency> _finish_dependency = nullptr;
};
} // namespace vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]