This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4066de375ef [bugfix](scannercore) scanner will core in deconstructor
during collect profile (#28727)
4066de375ef is described below
commit 4066de375efe6ff8e156a61df4f9316b3d9eaa4e
Author: yiguolei <[email protected]>
AuthorDate: Sat Dec 23 11:09:46 2023 +0800
[bugfix](scannercore) scanner will core in deconstructor during collect
profile (#28727)
---
be/src/exec/exec_node.h | 4 +
be/src/pipeline/exec/scan_operator.cpp | 64 ++-----
be/src/pipeline/exec/scan_operator.h | 16 +-
be/src/vec/exec/scan/pip_scanner_context.h | 20 +--
be/src/vec/exec/scan/scanner_context.cpp | 272 ++++++++++-------------------
be/src/vec/exec/scan/scanner_context.h | 47 ++---
be/src/vec/exec/scan/scanner_scheduler.cpp | 54 +++---
be/src/vec/exec/scan/scanner_scheduler.h | 4 +-
be/src/vec/exec/scan/vscan_node.cpp | 32 ++--
be/src/vec/exec/scan/vscan_node.h | 23 ++-
be/src/vec/exec/scan/vscanner.h | 13 --
11 files changed, 213 insertions(+), 336 deletions(-)
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index eeed37907f9..123097cfd53 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -239,6 +239,10 @@ public:
size_t children_count() const { return _children.size(); }
+ // when the fragment is normal finished, call this method to do some
finish work
+ // such as send the last buffer to remote.
+ virtual Status try_close(RuntimeState* state) { return Status::OK(); }
+
protected:
friend class DataSink;
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 05d9c7292f7..1e0f68131e8 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -64,14 +64,6 @@ bool ScanOperator::can_read() {
}
}
-bool ScanOperator::is_pending_finish() const {
- return _node->_scanner_ctx && !_node->_scanner_ctx->no_schedule();
-}
-
-Status ScanOperator::try_close(RuntimeState* state) {
- return _node->try_close(state);
-}
-
bool ScanOperator::runtime_filters_are_ready_or_timeout() {
return _node->runtime_filters_are_ready_or_timeout();
}
@@ -81,9 +73,8 @@ std::string ScanOperator::debug_string() const {
fmt::format_to(debug_string_buffer, "{}, scanner_ctx is null: {} ",
SourceOperator::debug_string(), _node->_scanner_ctx ==
nullptr);
if (_node->_scanner_ctx) {
- fmt::format_to(debug_string_buffer, ", num_running_scanners = {},
num_scheduling_ctx = {} ",
- _node->_scanner_ctx->get_num_running_scanners(),
- _node->_scanner_ctx->get_num_scheduling_ctx());
+ fmt::format_to(debug_string_buffer, ", num_running_scanners = {}",
+ _node->_scanner_ctx->get_num_running_scanners());
}
return fmt::to_string(debug_string_buffer);
}
@@ -101,9 +92,6 @@ std::string ScanOperator::debug_string() const {
template <typename Derived>
ScanLocalState<Derived>::ScanLocalState(RuntimeState* state, OperatorXBase*
parent)
: ScanLocalStateBase(state, parent) {
- _finish_dependency = std::make_shared<FinishDependency>(
- parent->operator_id(), parent->node_id(), parent->get_name() +
"_FINISH_DEPENDENCY",
- state->get_query_ctx());
_filter_dependency = std::make_shared<RuntimeFilterDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() +
"_FILTER_DEPENDENCY",
state->get_query_ctx());
@@ -177,7 +165,6 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {
auto status = _eos ? Status::OK() : _prepare_scanners();
if (_scanner_ctx) {
- _finish_dependency->block();
DCHECK(!_eos && _num_scanners->value() > 0);
RETURN_IF_ERROR(_scanner_ctx->init());
RETURN_IF_ERROR(state->exec_env()->scanner_scheduler()->submit(_scanner_ctx));
@@ -570,15 +557,14 @@ std::string ScanLocalState<Derived>::debug_string(int
indentation_level) const {
PipelineXLocalState<>::debug_string(indentation_level),
_eos.load());
if (_scanner_ctx) {
fmt::format_to(debug_string_buffer, "");
- fmt::format_to(
- debug_string_buffer,
- ", Scanner Context: (_is_finished = {}, _should_stop = {}, "
- "_num_running_scanners={}, "
- "_num_scheduling_ctx = {}, _num_unfinished_scanners = {},
status = {}, error = {})",
- _scanner_ctx->is_finished(), _scanner_ctx->should_stop(),
- _scanner_ctx->get_num_running_scanners(),
_scanner_ctx->get_num_scheduling_ctx(),
- _scanner_ctx->get_num_unfinished_scanners(),
_scanner_ctx->status().to_string(),
- _scanner_ctx->status_error());
+ fmt::format_to(debug_string_buffer,
+ ", Scanner Context: (_is_finished = {}, _should_stop =
{}, "
+ "_num_running_scanners={}, "
+ " _num_unfinished_scanners = {}, status = {}, error =
{})",
+ _scanner_ctx->is_finished(),
_scanner_ctx->should_stop(),
+ _scanner_ctx->get_num_running_scanners(),
+ _scanner_ctx->get_num_unfinished_scanners(),
+ _scanner_ctx->status().to_string(),
_scanner_ctx->status_error());
}
return fmt::to_string(debug_string_buffer);
@@ -1226,24 +1212,27 @@ template <typename Derived>
Status ScanLocalState<Derived>::_prepare_scanners() {
std::list<vectorized::VScannerSPtr> scanners;
RETURN_IF_ERROR(_init_scanners(&scanners));
+ // Init scanner wrapper
+ for (auto it = scanners.begin(); it != scanners.end(); ++it) {
+
_scanners.emplace_back(std::make_shared<vectorized::ScannerDelegate>(*it));
+ }
if (scanners.empty()) {
_eos = true;
_scan_dependency->set_ready();
} else {
COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
- RETURN_IF_ERROR(_start_scanners(scanners));
+ RETURN_IF_ERROR(_start_scanners(_scanners));
}
return Status::OK();
}
template <typename Derived>
Status ScanLocalState<Derived>::_start_scanners(
- const std::list<vectorized::VScannerSPtr>& scanners) {
+ const std::list<std::shared_ptr<vectorized::ScannerDelegate>>&
scanners) {
auto& p = _parent->cast<typename Derived::Parent>();
_scanner_ctx = PipScannerContext::create_shared(state(), this,
p._output_tuple_desc, scanners,
p.limit(),
state()->scan_queue_mem_limit(),
- p._col_distribute_ids, 1,
_scan_dependency,
- _finish_dependency);
+ p._col_distribute_ids, 1,
_scan_dependency);
return Status::OK();
}
@@ -1319,9 +1308,6 @@ Status ScanLocalState<Derived>::_init_profile() {
_max_scanner_thread_num = ADD_COUNTER(_runtime_profile,
"MaxScannerThreadNum", TUnit::UNIT);
- _wait_for_finish_dependency_timer =
- ADD_TIMER(_runtime_profile, "WaitForPendingFinishDependency");
-
return Status::OK();
}
@@ -1429,17 +1415,6 @@ Status ScanOperatorX<LocalStateType>::open(RuntimeState*
state) {
return Status::OK();
}
-template <typename LocalStateType>
-Status ScanOperatorX<LocalStateType>::try_close(RuntimeState* state) {
- auto& local_state = get_local_state(state);
- if (local_state._scanner_ctx) {
- // mark this scanner ctx as should_stop to make sure scanners will not
be scheduled anymore
- // TODO: there is a lock in `set_should_stop` may cause some slight
impact
- local_state._scanner_ctx->set_should_stop();
- }
- return Status::OK();
-}
-
template <typename Derived>
Status ScanLocalState<Derived>::close(RuntimeState* state) {
if (_closed) {
@@ -1451,10 +1426,9 @@ Status ScanLocalState<Derived>::close(RuntimeState*
state) {
SCOPED_TIMER(exec_time_counter());
if (_scanner_ctx) {
-
_scanner_ctx->clear_and_join(reinterpret_cast<ScanLocalStateBase*>(this),
state);
+ _scanner_ctx->stop_scanners(state);
}
COUNTER_SET(_wait_for_dependency_timer,
_scan_dependency->watcher_elapse_time());
- COUNTER_SET(_wait_for_finish_dependency_timer,
_finish_dependency->watcher_elapse_time());
COUNTER_SET(_wait_for_rf_timer, _filter_dependency->watcher_elapse_time());
return PipelineXLocalState<>::close(state);
@@ -1511,7 +1485,7 @@ Status
ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized:
if (eos) {
source_state = SourceState::FINISHED;
// reach limit, stop the scanners.
- local_state._scanner_ctx->set_should_stop();
+ local_state._scanner_ctx->stop_scanners(state);
}
return Status::OK();
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index 3690e9eb39c..bf083d82d5d 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -31,6 +31,9 @@
namespace doris {
class ExecNode;
} // namespace doris
+namespace doris::vectorized {
+class ScannerDelegate;
+}
namespace doris::pipeline {
class PipScannerContext;
@@ -48,13 +51,9 @@ public:
bool can_read() override; // for source
- bool is_pending_finish() const override;
-
bool runtime_filters_are_ready_or_timeout() override;
std::string debug_string() const override;
-
- Status try_close(RuntimeState* state) override;
};
class ScanDependency final : public Dependency {
@@ -171,7 +170,6 @@ protected:
RuntimeProfile::Counter* _wait_for_scanner_done_timer = nullptr;
// time of prefilter input block from scanner
RuntimeProfile::Counter* _wait_for_eos_timer = nullptr;
- RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr;
RuntimeProfile::Counter* _wait_for_rf_timer = nullptr;
};
@@ -214,7 +212,6 @@ class ScanLocalState : public ScanLocalStateBase {
Dependency* dependency() override { return _scan_dependency.get(); }
RuntimeFilterDependency* filterdependency() override { return
_filter_dependency.get(); };
- Dependency* finishdependency() override { return _finish_dependency.get();
}
protected:
template <typename LocalStateType>
@@ -350,7 +347,7 @@ protected:
Status _prepare_scanners();
// Submit the scanner to the thread pool and start execution
- Status _start_scanners(const std::list<vectorized::VScannerSPtr>&
scanners);
+ Status _start_scanners(const
std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners);
// For some conjunct there is chance to elimate cast operator
// Eg. Variant's sub column could eliminate cast in storage layer if
@@ -413,14 +410,13 @@ protected:
std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
- std::shared_ptr<Dependency> _finish_dependency;
+ // ScanLocalState owns the ownership of scanner, scanner context only has
its weakptr
+ std::list<std::shared_ptr<vectorized::ScannerDelegate>> _scanners;
};
template <typename LocalStateType>
class ScanOperatorX : public OperatorX<LocalStateType> {
public:
- Status try_close(RuntimeState* state) override;
-
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override { return
OperatorXBase::prepare(state); }
Status open(RuntimeState* state) override;
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h
b/be/src/vec/exec/scan/pip_scanner_context.h
index 309aed96a8c..56ceb20bf15 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -31,9 +31,9 @@ class PipScannerContext : public vectorized::ScannerContext {
public:
PipScannerContext(RuntimeState* state, vectorized::VScanNode* parent,
const TupleDescriptor* output_tuple_desc,
- const std::list<vectorized::VScannerSPtr>& scanners,
int64_t limit_,
- int64_t max_bytes_in_blocks_queue, const
std::vector<int>& col_distribute_ids,
- const int num_parallel_instances)
+ const
std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
+ int64_t limit_, int64_t max_bytes_in_blocks_queue,
+ const std::vector<int>& col_distribute_ids, const int
num_parallel_instances)
: vectorized::ScannerContext(state, parent, output_tuple_desc,
scanners, limit_,
max_bytes_in_blocks_queue,
num_parallel_instances),
_col_distribute_ids(col_distribute_ids),
@@ -41,14 +41,13 @@ public:
PipScannerContext(RuntimeState* state, ScanLocalStateBase* local_state,
const TupleDescriptor* output_tuple_desc,
- const std::list<vectorized::VScannerSPtr>& scanners,
int64_t limit_,
- int64_t max_bytes_in_blocks_queue, const
std::vector<int>& col_distribute_ids,
- const int num_parallel_instances,
- std::shared_ptr<pipeline::ScanDependency> dependency,
- std::shared_ptr<pipeline::Dependency> finish_dependency)
+ const
std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
+ int64_t limit_, int64_t max_bytes_in_blocks_queue,
+ const std::vector<int>& col_distribute_ids, const int
num_parallel_instances,
+ std::shared_ptr<pipeline::ScanDependency> dependency)
: vectorized::ScannerContext(state, output_tuple_desc, scanners,
limit_,
max_bytes_in_blocks_queue,
num_parallel_instances,
- local_state, dependency,
finish_dependency),
+ local_state, dependency),
_need_colocate_distribute(false) {}
Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr*
block, bool* eos,
@@ -111,9 +110,6 @@ public:
return Status::OK();
}
- // We should make those method lock free.
- bool done() override { return _is_finished || _should_stop; }
-
void append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks)
override {
const int queue_size = _blocks_queues.size();
const int block_size = blocks.size();
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index 5ad2dbec5b6..908b2a663b7 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -46,11 +46,11 @@ namespace doris::vectorized {
using namespace std::chrono_literals;
ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor*
output_tuple_desc,
- const std::list<VScannerSPtr>& scanners,
int64_t limit_,
- int64_t max_bytes_in_blocks_queue, const int
num_parallel_instances,
+ const
std::list<std::shared_ptr<ScannerDelegate>>& scanners,
+ int64_t limit_, int64_t
max_bytes_in_blocks_queue,
+ const int num_parallel_instances,
pipeline::ScanLocalStateBase* local_state,
- std::shared_ptr<pipeline::ScanDependency>
dependency,
- std::shared_ptr<pipeline::Dependency>
finish_dependency)
+ std::shared_ptr<pipeline::ScanDependency>
dependency)
: _state(state),
_parent(nullptr),
_local_state(local_state),
@@ -61,11 +61,10 @@ ScannerContext::ScannerContext(RuntimeState* state, const
TupleDescriptor* outpu
_max_bytes_in_queue(std::max(max_bytes_in_blocks_queue,
(int64_t)1024) *
num_parallel_instances),
_scanner_scheduler(state->exec_env()->scanner_scheduler()),
- _scanners(scanners),
- _scanners_ref(scanners.begin(), scanners.end()),
+ _scanners(scanners.begin(), scanners.end()),
+ _all_scanners(scanners.begin(), scanners.end()),
_num_parallel_instances(num_parallel_instances),
- _dependency(dependency),
- _finish_dependency(finish_dependency) {
+ _dependency(dependency) {
// Use the task exec context as a lock between scanner threads and
fragment exection threads
_task_exec_ctx = _state->get_task_execution_context();
_query_id = _state->get_query_ctx()->query_id();
@@ -92,8 +91,9 @@ ScannerContext::ScannerContext(RuntimeState* state, const
TupleDescriptor* outpu
ScannerContext::ScannerContext(doris::RuntimeState* state,
doris::vectorized::VScanNode* parent,
const doris::TupleDescriptor* output_tuple_desc,
- const std::list<VScannerSPtr>& scanners,
int64_t limit_,
- int64_t max_bytes_in_blocks_queue, const int
num_parallel_instances,
+ const
std::list<std::shared_ptr<ScannerDelegate>>& scanners,
+ int64_t limit_, int64_t
max_bytes_in_blocks_queue,
+ const int num_parallel_instances,
pipeline::ScanLocalStateBase* local_state)
: _state(state),
_parent(parent),
@@ -105,8 +105,8 @@ ScannerContext::ScannerContext(doris::RuntimeState* state,
doris::vectorized::VS
_max_bytes_in_queue(std::max(max_bytes_in_blocks_queue,
(int64_t)1024) *
num_parallel_instances),
_scanner_scheduler(state->exec_env()->scanner_scheduler()),
- _scanners(scanners),
- _scanners_ref(scanners.begin(), scanners.end()),
+ _scanners(scanners.begin(), scanners.end()),
+ _all_scanners(scanners.begin(), scanners.end()),
_num_parallel_instances(num_parallel_instances) {
// Use the task exec context as a lock between scanner threads and
fragment exection threads
_task_exec_ctx = _state->get_task_execution_context();
@@ -182,10 +182,6 @@ Status ScannerContext::init() {
}
#endif
- // 4. This ctx will be submitted to the scanner scheduler right after init.
- // So set _num_scheduling_ctx to 1 here.
- _num_scheduling_ctx = 1;
-
_num_unfinished_scanners = _scanners.size();
if (_parent) {
@@ -275,11 +271,9 @@ Status ScannerContext::get_block_from_queue(RuntimeState*
state, vectorized::Blo
bool is_scheduled = false;
if (!done() && to_be_schedule && _num_running_scanners == 0) {
is_scheduled = true;
- auto state = _scanner_scheduler->submit(shared_from_this());
- if (state.ok()) {
- _num_scheduling_ctx++;
- } else {
- set_status_on_error(state, false);
+ auto submit_status =
_scanner_scheduler->submit(shared_from_this());
+ if (!submit_status.ok()) {
+ set_status_on_error(submit_status, false);
}
}
@@ -370,41 +364,17 @@ Status ScannerContext::validate_block_schema(Block*
block) {
return Status::OK();
}
-void ScannerContext::set_should_stop() {
- std::lock_guard l(_transfer_lock);
- _should_stop = true;
- _set_scanner_done();
- for (const VScannerWPtr& scanner : _scanners_ref) {
- if (VScannerSPtr sc = scanner.lock()) {
- sc->try_stop();
- }
- }
- _blocks_queue_added_cv.notify_one();
- set_ready_to_finish();
-}
-
void ScannerContext::inc_num_running_scanners(int32_t inc) {
std::lock_guard l(_transfer_lock);
_num_running_scanners += inc;
}
-void ScannerContext::dec_num_scheduling_ctx() {
+void ScannerContext::dec_num_running_scanners(int32_t scanner_dec) {
std::lock_guard l(_transfer_lock);
- _num_scheduling_ctx--;
- set_ready_to_finish();
- if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) {
- _ctx_finish_cv.notify_one();
- }
-}
-
-void ScannerContext::set_ready_to_finish() {
- // `_should_stop == true` means this task has already ended and wait for
pending finish now.
- if (_finish_dependency && done() && _num_running_scanners == 0 &&
_num_scheduling_ctx == 0) {
- _finish_dependency->set_ready();
- }
+ _num_running_scanners -= scanner_dec;
}
-bool ScannerContext::set_status_on_error(const Status& status, bool need_lock)
{
+void ScannerContext::set_status_on_error(const Status& status, bool need_lock)
{
std::unique_lock l(_transfer_lock, std::defer_lock);
if (need_lock) {
l.lock();
@@ -415,14 +385,20 @@ bool ScannerContext::set_status_on_error(const Status&
status, bool need_lock) {
_blocks_queue_added_cv.notify_one();
_should_stop = true;
_set_scanner_done();
- return true;
}
- return false;
}
-template <typename Parent>
-Status ScannerContext::_close_and_clear_scanners(Parent* parent, RuntimeState*
state) {
- std::unique_lock l(_scanners_lock);
+void ScannerContext::stop_scanners(RuntimeState* state) {
+ std::unique_lock l(_transfer_lock);
+ _should_stop = true;
+ _set_scanner_done();
+ for (const std::weak_ptr<ScannerDelegate>& scanner : _all_scanners) {
+ if (std::shared_ptr<ScannerDelegate> sc = scanner.lock()) {
+ sc->_scanner->try_stop();
+ }
+ }
+ _blocks_queue.clear();
+ // TODO yiguolei, call mark close to scanners
if (state->enable_profile()) {
std::stringstream scanner_statistics;
std::stringstream scanner_rows_read;
@@ -430,76 +406,38 @@ Status ScannerContext::_close_and_clear_scanners(Parent*
parent, RuntimeState* s
scanner_statistics << "[";
scanner_rows_read << "[";
scanner_wait_worker_time << "[";
- for (auto finished_scanner_time : _finished_scanner_runtime) {
- scanner_statistics << PrettyPrinter::print(finished_scanner_time,
TUnit::TIME_NS)
- << ", ";
- }
- for (auto finished_scanner_rows : _finished_scanner_rows_read) {
- scanner_rows_read << PrettyPrinter::print(finished_scanner_rows,
TUnit::UNIT) << ", ";
- }
- for (auto finished_scanner_wait_time :
_finished_scanner_wait_worker_time) {
- scanner_wait_worker_time
- << PrettyPrinter::print(finished_scanner_wait_time,
TUnit::TIME_NS) << ", ";
- }
- // Only unfinished scanners here
- for (auto& scanner : _scanners) {
- // Scanners are in ObjPool in ScanNode,
- // so no need to delete them here.
+ // Scanners can in 3 state
+ // state 1: in scanner context, not scheduled
+ // state 2: in scanner worker pool's queue, scheduled but not running
+ // state 3: scanner is running.
+ for (auto& scanner_ref : _all_scanners) {
+ auto scanner = scanner_ref.lock();
+ if (scanner == nullptr) {
+ continue;
+ }
// Add per scanner running time before close them
- scanner_statistics <<
PrettyPrinter::print(scanner->get_time_cost_ns(), TUnit::TIME_NS)
+ scanner_statistics <<
PrettyPrinter::print(scanner->_scanner->get_time_cost_ns(),
+ TUnit::TIME_NS)
<< ", ";
- scanner_rows_read <<
PrettyPrinter::print(scanner->get_rows_read(), TUnit::UNIT)
+ scanner_rows_read <<
PrettyPrinter::print(scanner->_scanner->get_rows_read(),
+ TUnit::UNIT)
<< ", ";
scanner_wait_worker_time
- <<
PrettyPrinter::print(scanner->get_scanner_wait_worker_timer(),
+ <<
PrettyPrinter::print(scanner->_scanner->get_scanner_wait_worker_timer(),
TUnit::TIME_NS)
<< ", ";
+ // since there are all scanners, some scanners is running, so that
could not call scanner
+ // close here.
}
scanner_statistics << "]";
scanner_rows_read << "]";
scanner_wait_worker_time << "]";
- parent->scanner_profile()->add_info_string("PerScannerRunningTime",
- scanner_statistics.str());
- parent->scanner_profile()->add_info_string("PerScannerRowsRead",
scanner_rows_read.str());
- parent->scanner_profile()->add_info_string("PerScannerWaitTime",
-
scanner_wait_worker_time.str());
- }
- // Only unfinished scanners here
- for (auto& scanner : _scanners) {
- static_cast<void>(scanner->close(state));
- // Scanners are in ObjPool in ScanNode,
- // so no need to delete them here.
+ _scanner_profile->add_info_string("PerScannerRunningTime",
scanner_statistics.str());
+ _scanner_profile->add_info_string("PerScannerRowsRead",
scanner_rows_read.str());
+ _scanner_profile->add_info_string("PerScannerWaitTime",
scanner_wait_worker_time.str());
}
- _scanners.clear();
- return Status::OK();
-}
-
-template <typename Parent>
-void ScannerContext::clear_and_join(Parent* parent, RuntimeState* state) {
- std::unique_lock l(_transfer_lock);
- do {
- if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) {
- break;
- } else {
- DCHECK(!state->enable_pipeline_exec())
- << " _num_running_scanners: " << _num_running_scanners
- << " _num_scheduling_ctx: " << _num_scheduling_ctx;
- while (!(_num_running_scanners == 0 && _num_scheduling_ctx == 0)) {
- _ctx_finish_cv.wait(l);
- }
- break;
- }
- } while (false);
- // Must wait all running scanners stop running.
- // So that we can make sure to close all scanners.
- static_cast<void>(_close_and_clear_scanners(parent, state));
-
- _blocks_queue.clear();
-}
-bool ScannerContext::no_schedule() {
- std::unique_lock l(_transfer_lock);
- return _num_running_scanners == 0 && _num_scheduling_ctx == 0;
+ _blocks_queue_added_cv.notify_one();
}
void ScannerContext::_set_scanner_done() {
@@ -512,12 +450,11 @@ std::string ScannerContext::debug_string() {
return fmt::format(
"id: {}, sacnners: {}, blocks in queue: {},"
" status: {}, _should_stop: {}, _is_finished: {}, free blocks: {},"
- " limit: {}, _num_running_scanners: {}, _num_scheduling_ctx: {},
_max_thread_num: {},"
+ " limit: {}, _num_running_scanners: {}, _max_thread_num: {},"
" _block_per_scanner: {}, _cur_bytes_in_queue: {},
MAX_BYTE_OF_QUEUE: {}",
ctx_id, _scanners.size(), _blocks_queue.size(), status().ok(),
_should_stop,
- _is_finished, _free_blocks.size_approx(), limit,
_num_running_scanners,
- _num_scheduling_ctx, _max_thread_num, _block_per_scanner,
_cur_bytes_in_queue,
- _max_bytes_in_queue);
+ _is_finished, _free_blocks.size_approx(), limit,
_num_running_scanners, _max_thread_num,
+ _block_per_scanner, _cur_bytes_in_queue, _max_bytes_in_queue);
}
void ScannerContext::reschedule_scanner_ctx() {
@@ -525,84 +462,67 @@ void ScannerContext::reschedule_scanner_ctx() {
if (done()) {
return;
}
- auto state = _scanner_scheduler->submit(shared_from_this());
+ auto submit_status = _scanner_scheduler->submit(shared_from_this());
//todo(wb) rethinking is it better to mark current scan_context failed
when submit failed many times?
- if (state.ok()) {
- _num_scheduling_ctx++;
- } else {
- set_status_on_error(state, false);
+ if (!submit_status.ok()) {
+ set_status_on_error(submit_status, false);
}
}
-void ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) {
- {
- std::unique_lock l(_scanners_lock);
- _scanners.push_front(scanner);
- }
+void
ScannerContext::push_back_scanner_and_reschedule(std::shared_ptr<ScannerDelegate>
scanner) {
std::lock_guard l(_transfer_lock);
-
- // In pipeline engine, doris will close scanners when `no_schedule`.
- // We have to decrease _num_running_scanners before schedule, otherwise
- // schedule does not woring due to _num_running_scanners.
- _num_running_scanners--;
- set_ready_to_finish();
-
- if (!done() && should_be_scheduled()) {
- auto state = _scanner_scheduler->submit(shared_from_this());
- if (state.ok()) {
- _num_scheduling_ctx++;
- } else {
- set_status_on_error(state, false);
+ // Use a transfer lock to avoid the scanner be scheduled concurrently. For
example, that after
+ // calling "_scanners.push_front(scanner)", there may be other ctx in
scheduler
+ // to schedule that scanner right away, and in that schedule run, the
scanner may be marked as closed
+ // before we call the following if() block.
+ if (scanner->_scanner->need_to_close()) {
+ --_num_unfinished_scanners;
+ if (_num_unfinished_scanners == 0) {
+ _dispose_coloate_blocks_not_in_queue();
+ _is_finished = true;
+ _set_scanner_done();
+ _blocks_queue_added_cv.notify_one();
+ return;
}
}
- // Notice that after calling "_scanners.push_front(scanner)", there may be
other ctx in scheduler
- // to schedule that scanner right away, and in that schedule run, the
scanner may be marked as closed
- // before we call the following if() block.
- // So we need "scanner->set_counted_down()" to avoid
"_num_unfinished_scanners" being decreased twice by
- // same scanner.
- if (scanner->need_to_close() && scanner->set_counted_down() &&
- (--_num_unfinished_scanners) == 0) {
- _dispose_coloate_blocks_not_in_queue();
- _is_finished = true;
- _set_scanner_done();
- _blocks_queue_added_cv.notify_one();
+ _scanners.push_front(scanner);
+
+ if (should_be_scheduled()) {
+ auto submit_status = _scanner_scheduler->submit(shared_from_this());
+ if (!submit_status.ok()) {
+ set_status_on_error(submit_status, false);
+ }
}
- _ctx_finish_cv.notify_one();
}
-void ScannerContext::get_next_batch_of_scanners(std::list<VScannerSPtr>*
current_run) {
+// This method is called in scanner scheduler, and task context is hold
+void ScannerContext::get_next_batch_of_scanners(
+ std::list<std::weak_ptr<ScannerDelegate>>* current_run) {
+ std::lock_guard l(_transfer_lock);
+ // Update the sched counter for profile
+ Defer defer {[&]() { _scanner_sched_counter->update(current_run->size());
}};
// 1. Calculate how many scanners should be scheduled at this run.
- int thread_slot_num = 0;
- {
- // If there are enough space in blocks queue,
- // the scanner number depends on the _free_blocks numbers
- thread_slot_num = get_available_thread_slot_num();
- }
+ // If there are enough space in blocks queue,
+ // the scanner number depends on the _free_blocks numbers
+ int thread_slot_num = get_available_thread_slot_num();
// 2. get #thread_slot_num scanners from ctx->scanners
// and put them into "this_run".
- {
- std::unique_lock l(_scanners_lock);
- for (int i = 0; i < thread_slot_num && !_scanners.empty();) {
- VScannerSPtr scanner = _scanners.front();
- _scanners.pop_front();
- if (scanner->need_to_close()) {
-
_finished_scanner_runtime.push_back(scanner->get_time_cost_ns());
-
_finished_scanner_rows_read.push_back(scanner->get_rows_read());
- _finished_scanner_wait_worker_time.push_back(
- scanner->get_scanner_wait_worker_timer());
- static_cast<void>(scanner->close(_state));
- } else {
- current_run->push_back(scanner);
- i++;
- }
+ for (int i = 0; i < thread_slot_num && !_scanners.empty();) {
+ std::weak_ptr<ScannerDelegate> scanner_ref = _scanners.front();
+ std::shared_ptr<ScannerDelegate> scanner = scanner_ref.lock();
+ _scanners.pop_front();
+ if (scanner == nullptr) {
+ continue;
+ }
+ if (scanner->_scanner->need_to_close()) {
+ static_cast<void>(scanner->_scanner->close(_state));
+ } else {
+ current_run->push_back(scanner_ref);
+ i++;
}
}
}
-template void ScannerContext::clear_and_join(pipeline::ScanLocalStateBase*
parent,
- RuntimeState* state);
-template void ScannerContext::clear_and_join(VScanNode* parent, RuntimeState*
state);
-
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index ba9c1fdee10..e320eb55b2e 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -53,6 +53,7 @@ class TaskGroup;
namespace vectorized {
class VScanner;
+class ScannerDelegate;
class VScanNode;
class ScannerScheduler;
class SimplifiedScanScheduler;
@@ -70,7 +71,7 @@ class ScannerContext : public
std::enable_shared_from_this<ScannerContext> {
public:
ScannerContext(RuntimeState* state, VScanNode* parent, const
TupleDescriptor* output_tuple_desc,
- const std::list<VScannerSPtr>& scanners, int64_t limit_,
+ const std::list<std::shared_ptr<ScannerDelegate>>&
scanners, int64_t limit_,
int64_t max_bytes_in_blocks_queue, const int
num_parallel_instances = 1,
pipeline::ScanLocalStateBase* local_state = nullptr);
@@ -92,9 +93,9 @@ public:
// When a scanner complete a scan, this method will be called
// to return the scanner to the list for next scheduling.
- void push_back_scanner_and_reschedule(VScannerSPtr scanner);
+ void push_back_scanner_and_reschedule(std::shared_ptr<ScannerDelegate>
scanner);
- bool set_status_on_error(const Status& status, bool need_lock = true);
+ void set_status_on_error(const Status& status, bool need_lock = true);
Status status() {
if (_process_status.is<ErrorCode::END_OF_FILE>()) {
@@ -103,34 +104,21 @@ public:
return _process_status;
}
- // Called by ScanNode.
- // Used to notify the scheduler that this ScannerContext can stop working.
- void set_should_stop();
-
// Return true if this ScannerContext need no more process
- virtual bool done() { return _is_finished || _should_stop; }
+ bool done() const { return _is_finished || _should_stop; }
bool is_finished() { return _is_finished.load(); }
bool should_stop() { return _should_stop.load(); }
bool status_error() { return _status_error.load(); }
void inc_num_running_scanners(int32_t scanner_inc);
- void set_ready_to_finish();
+ void dec_num_running_scanners(int32_t scanner_dec);
int get_num_running_scanners() const { return _num_running_scanners; }
int get_num_unfinished_scanners() const { return _num_unfinished_scanners;
}
- void dec_num_scheduling_ctx();
-
- int get_num_scheduling_ctx() const { return _num_scheduling_ctx; }
-
- void get_next_batch_of_scanners(std::list<VScannerSPtr>* current_run);
-
- template <typename Parent>
- void clear_and_join(Parent* parent, RuntimeState* state);
-
- bool no_schedule();
+ void get_next_batch_of_scanners(std::list<std::weak_ptr<ScannerDelegate>>*
current_run);
virtual std::string debug_string();
@@ -138,7 +126,6 @@ public:
void incr_num_ctx_scheduling(int64_t num) {
_scanner_ctx_sched_counter->update(num); }
void incr_ctx_scheduling_time(int64_t num) {
_scanner_ctx_sched_time->update(num); }
- void incr_num_scanner_scheduling(int64_t num) {
_scanner_sched_counter->update(num); }
std::string parent_name();
@@ -146,7 +133,7 @@ public:
// todo(wb) rethinking how to calculate ```_max_bytes_in_queue``` when
executing shared scan
inline bool should_be_scheduled() const {
- return (_cur_bytes_in_queue < _max_bytes_in_queue / 2) &&
+ return !done() && (_cur_bytes_in_queue < _max_bytes_in_queue / 2) &&
(_serving_blocks_num < allowed_blocks_num());
}
@@ -169,6 +156,8 @@ public:
SimplifiedScanScheduler* get_simple_scan_scheduler() { return
_simple_scan_scheduler; }
+ void stop_scanners(RuntimeState* state);
+
void reschedule_scanner_ctx();
// the unique id of this context
@@ -181,17 +170,12 @@ public:
std::weak_ptr<TaskExecutionContext> get_task_execution_context() { return
_task_exec_ctx; }
-private:
- template <typename Parent>
- Status _close_and_clear_scanners(Parent* parent, RuntimeState* state);
-
protected:
ScannerContext(RuntimeState* state_, const TupleDescriptor*
output_tuple_desc,
- const std::list<VScannerSPtr>& scanners_, int64_t limit_,
+ const std::list<std::shared_ptr<ScannerDelegate>>&
scanners_, int64_t limit_,
int64_t max_bytes_in_blocks_queue_, const int
num_parallel_instances,
pipeline::ScanLocalStateBase* local_state,
- std::shared_ptr<pipeline::ScanDependency> dependency,
- std::shared_ptr<pipeline::Dependency> finish_dependency);
+ std::shared_ptr<pipeline::ScanDependency> dependency);
virtual void _dispose_coloate_blocks_not_in_queue() {}
void _set_scanner_done();
@@ -275,9 +259,11 @@ protected:
// and then if the scanner is not finished, will be pushed back to this
list.
// Not need to protect by lock, because only one scheduler thread will
access to it.
std::mutex _scanners_lock;
- std::list<VScannerSPtr> _scanners;
+ // Scanner's ownership belong to vscannode or scanoperator, scanner
context does not own it.
+ // ScannerContext has to check if scanner is deconstructed before use it.
+ std::list<std::weak_ptr<ScannerDelegate>> _scanners;
// weak pointer for _scanners, used in stop function
- std::vector<VScannerWPtr> _scanners_ref;
+ std::vector<std::weak_ptr<ScannerDelegate>> _all_scanners;
std::vector<int64_t> _finished_scanner_runtime;
std::vector<int64_t> _finished_scanner_rows_read;
std::vector<int64_t> _finished_scanner_wait_worker_time;
@@ -294,7 +280,6 @@ protected:
RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr;
std::shared_ptr<pipeline::ScanDependency> _dependency = nullptr;
- std::shared_ptr<pipeline::Dependency> _finish_dependency = nullptr;
};
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index e8d7f8a7139..a67b9d7f27a 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -180,20 +180,14 @@ void
ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) {
watch.reset();
watch.start();
ctx->incr_num_ctx_scheduling(1);
- size_t size = 0;
- Defer defer {[&]() {
- ctx->incr_num_scanner_scheduling(size);
- ctx->dec_num_scheduling_ctx();
- }};
if (ctx->done()) {
return;
}
- std::list<VScannerSPtr> this_run;
+ std::list<std::weak_ptr<ScannerDelegate>> this_run;
ctx->get_next_batch_of_scanners(&this_run);
- size = this_run.size();
- if (!size) {
+ if (this_run.empty()) {
// There will be 2 cases when this_run is empty:
// 1. The blocks queue reaches limit.
// The consumer will continue scheduling the ctx.
@@ -212,9 +206,14 @@ void
ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) {
if (ctx->thread_token != nullptr) {
// TODO llj tg how to treat this?
while (iter != this_run.end()) {
- (*iter)->start_wait_worker_timer();
- auto s = ctx->thread_token->submit_func(
- [this, scanner = *iter, ctx] { this->_scanner_scan(this,
ctx, scanner); });
+ std::shared_ptr<ScannerDelegate> scanner_delegate = (*iter).lock();
+ if (scanner_delegate == nullptr) {
+ continue;
+ }
+ scanner_delegate->_scanner->start_wait_worker_timer();
+ auto s = ctx->thread_token->submit_func([this, scanner_ref =
*iter, ctx]() {
+ this->_scanner_scan(this, ctx, scanner_ref);
+ });
if (s.ok()) {
this_run.erase(iter++);
} else {
@@ -224,28 +223,32 @@ void
ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) {
}
} else {
while (iter != this_run.end()) {
- (*iter)->start_wait_worker_timer();
- TabletStorageType type = (*iter)->get_storage_type();
+ std::shared_ptr<ScannerDelegate> scanner_delegate = (*iter).lock();
+ if (scanner_delegate == nullptr) {
+ continue;
+ }
+ scanner_delegate->_scanner->start_wait_worker_timer();
+ TabletStorageType type =
scanner_delegate->_scanner->get_storage_type();
bool ret = false;
if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
if (auto* scan_sche = ctx->get_simple_scan_scheduler()) {
- auto work_func = [this, scanner = *iter, ctx] {
- this->_scanner_scan(this, ctx, scanner);
+ auto work_func = [this, scanner_ref = *iter, ctx]() {
+ this->_scanner_scan(this, ctx, scanner_ref);
};
SimplifiedScanTask simple_scan_task = {work_func, ctx};
ret =
scan_sche->get_scan_queue()->try_put(simple_scan_task);
} else {
PriorityThreadPool::Task task;
- task.work_function = [this, scanner = *iter, ctx] {
- this->_scanner_scan(this, ctx, scanner);
+ task.work_function = [this, scanner_ref = *iter, ctx]() {
+ this->_scanner_scan(this, ctx, scanner_ref);
};
task.priority = nice;
ret = _local_scan_thread_pool->offer(task);
}
} else {
PriorityThreadPool::Task task;
- task.work_function = [this, scanner = *iter, ctx] {
- this->_scanner_scan(this, ctx, scanner);
+ task.work_function = [this, scanner_ref = *iter, ctx]() {
+ this->_scanner_scan(this, ctx, scanner_ref);
};
task.priority = nice;
ret = _remote_scan_thread_pool->offer(task);
@@ -263,13 +266,22 @@ void
ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) {
}
void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler,
- std::shared_ptr<ScannerContext> ctx,
VScannerSPtr scanner) {
+ std::shared_ptr<ScannerContext> ctx,
+ std::weak_ptr<ScannerDelegate>
scanner_ref) {
+ Defer defer {[&]() { ctx->dec_num_running_scanners(1); }};
auto task_lock = ctx->get_task_execution_context().lock();
if (task_lock == nullptr) {
// LOG(WARNING) << "could not lock task execution context, query " <<
print_id(_query_id)
// << " maybe finished";
return;
}
+ // will release scanner if it is the last one, task lock is hold here, to
ensure
+ // that scanner could call scannode's method during deconstructor
+ std::shared_ptr<ScannerDelegate> scanner_delegate = scanner_ref.lock();
+ auto& scanner = scanner_delegate->_scanner;
+ if (scanner_delegate == nullptr) {
+ return;
+ }
SCOPED_ATTACH_TASK(scanner->runtime_state());
// for cpu hard limit, thread name should not be reset
if (ctx->_should_reset_thread_name) {
@@ -400,7 +412,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler*
scheduler,
if (eos || should_stop) {
scanner->mark_to_need_to_close();
}
- ctx->push_back_scanner_and_reschedule(scanner);
+ ctx->push_back_scanner_and_reschedule(scanner_delegate);
}
void ScannerScheduler::_register_metrics() {
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h
b/be/src/vec/exec/scan/scanner_scheduler.h
index eb4d1380e39..9fedd27dbd8 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -36,7 +36,7 @@ class BlockingQueue;
} // namespace doris
namespace doris::vectorized {
-
+class ScannerDelegate;
class ScannerContext;
// Responsible for the scheduling and execution of all Scanners of a BE node.
@@ -79,7 +79,7 @@ private:
void _schedule_scanners(std::shared_ptr<ScannerContext> ctx);
// execution thread function
void _scanner_scan(ScannerScheduler* scheduler,
std::shared_ptr<ScannerContext> ctx,
- VScannerSPtr scanner);
+ std::weak_ptr<ScannerDelegate> scanner);
void _register_metrics();
diff --git a/be/src/vec/exec/scan/vscan_node.cpp
b/be/src/vec/exec/scan/vscan_node.cpp
index 5176d7900b3..b780fc1a8a9 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -273,7 +273,7 @@ Status VScanNode::get_next(RuntimeState* state,
vectorized::Block* block, bool*
reached_limit(block, eos);
if (*eos) {
// reach limit, stop the scanners.
- _scanner_ctx->set_should_stop();
+ _scanner_ctx->stop_scanners(state);
}
return Status::OK();
@@ -318,8 +318,8 @@ Status VScanNode::_init_profile() {
return Status::OK();
}
-Status VScanNode::_start_scanners(const std::list<VScannerSPtr>& scanners,
- const int query_parallel_instance_num) {
+void VScanNode::_start_scanners(const
std::list<std::shared_ptr<ScannerDelegate>>& scanners,
+ const int query_parallel_instance_num) {
if (_is_pipeline_scan) {
int max_queue_size = _shared_scan_opt ?
std::max(query_parallel_instance_num, 1) : 1;
_scanner_ctx = pipeline::PipScannerContext::create_shared(
@@ -329,41 +329,29 @@ Status VScanNode::_start_scanners(const
std::list<VScannerSPtr>& scanners,
_scanner_ctx = ScannerContext::create_shared(_state, this,
_output_tuple_desc, scanners,
limit(),
_state->scan_queue_mem_limit());
}
- return Status::OK();
}
Status VScanNode::close(RuntimeState* state) {
if (is_closed()) {
return Status::OK();
}
+
RETURN_IF_ERROR(ExecNode::close(state));
return Status::OK();
}
void VScanNode::release_resource(RuntimeState* state) {
if (_scanner_ctx) {
- if (!state->enable_pipeline_exec()) {
+ if (!state->enable_pipeline_exec() || _should_create_scanner) {
// stop and wait the scanner scheduler to be done
// _scanner_ctx may not be created for some short circuit case.
- _scanner_ctx->set_should_stop();
- _scanner_ctx->clear_and_join(this, state);
- } else if (_should_create_scanner) {
- _scanner_ctx->clear_and_join(this, state);
+ _scanner_ctx->stop_scanners(state);
}
}
-
+ _scanners.clear();
ExecNode::release_resource(state);
}
-Status VScanNode::try_close(RuntimeState* state) {
- if (_scanner_ctx) {
- // mark this scanner ctx as should_stop to make sure scanners will not
be scheduled anymore
- // TODO: there is a lock in `set_should_stop` may cause some slight
impact
- _scanner_ctx->set_should_stop();
- }
- return Status::OK();
-}
-
Status VScanNode::_normalize_conjuncts() {
// The conjuncts is always on output tuple, so use _output_tuple_desc;
std::vector<SlotDescriptor*> slots = _output_tuple_desc->slots();
@@ -1329,11 +1317,15 @@ VScanNode::PushDownType
VScanNode::_should_push_down_in_predicate(VInPredicate*
Status VScanNode::_prepare_scanners(const int query_parallel_instance_num) {
std::list<VScannerSPtr> scanners;
RETURN_IF_ERROR(_init_scanners(&scanners));
+ // Init scanner wrapper
+ for (auto it = scanners.begin(); it != scanners.end(); ++it) {
+ _scanners.emplace_back(std::make_shared<ScannerDelegate>(*it));
+ }
if (scanners.empty()) {
_eos = true;
} else {
COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
- RETURN_IF_ERROR(_start_scanners(scanners,
query_parallel_instance_num));
+ _start_scanners(_scanners, query_parallel_instance_num);
}
return Status::OK();
}
diff --git a/be/src/vec/exec/scan/vscan_node.h
b/be/src/vec/exec/scan/vscan_node.h
index 5917d0ff46b..d4a054cacd5 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -87,6 +87,16 @@ struct FilterPredicates {
std::vector<std::pair<std::string, std::shared_ptr<HybridSetBase>>>
in_filters;
};
+// We want to close scanner automatically, so using a delegate class
+// and call close method in the delegate class's dctor.
+class ScannerDelegate {
+public:
+ VScannerSPtr _scanner;
+ ScannerDelegate(VScannerSPtr& scanner_ptr) : _scanner(scanner_ptr) {}
+ ~ScannerDelegate() {
static_cast<void>(_scanner->close(_scanner->runtime_state())); }
+ ScannerDelegate(ScannerDelegate&&) = delete;
+};
+
class VScanNode : public ExecNode, public RuntimeFilterConsumer {
public:
VScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl&
descs)
@@ -156,8 +166,6 @@ public:
Status alloc_resource(RuntimeState* state) override;
void release_resource(RuntimeState* state) override;
- Status try_close(RuntimeState* state);
-
bool should_run_serial() const {
return _should_run_serial || _state->enable_scan_node_run_serial();
}
@@ -262,8 +270,11 @@ protected:
int _max_scan_key_num;
int _max_pushdown_conditions_per_column;
- // Each scan node will generates a ScannerContext to manage all Scanners.
- // See comments of ScannerContext for more details
+ // ScanNode owns the ownership of scanner, scanner context only has its
weakptr
+ std::list<std::shared_ptr<ScannerDelegate>> _scanners;
+
+ // Each scan node will generates a ScannerContext to do schedule work
+ // ScannerContext will be added to scanner scheduler
std::shared_ptr<ScannerContext> _scanner_ctx = nullptr;
// indicate this scan node has no more data to return
@@ -437,8 +448,8 @@ private:
const std::string& fn_name, int
slot_ref_child = -1);
// Submit the scanner to the thread pool and start execution
- Status _start_scanners(const std::list<VScannerSPtr>& scanners,
- const int query_parallel_instance_num);
+ void _start_scanners(const std::list<std::shared_ptr<ScannerDelegate>>&
scanners,
+ const int query_parallel_instance_num);
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 29daf9a68c5..6046d87ac91 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -145,16 +145,6 @@ public:
void set_status_on_failure(const Status& st) { _status = st; }
- // return false if _is_counted_down is already true,
- // otherwise, set _is_counted_down to true and return true.
- bool set_counted_down() {
- if (_is_counted_down) {
- return false;
- }
- _is_counted_down = true;
- return true;
- }
-
protected:
void _discard_conjuncts() {
for (auto& conjunct : _conjuncts) {
@@ -215,8 +205,6 @@ protected:
int64_t _scan_cpu_timer = 0;
bool _is_load = false;
- // set to true after decrease the "_num_unfinished_scanners" in scanner
context
- bool _is_counted_down = false;
bool _is_init = true;
@@ -227,6 +215,5 @@ protected:
};
using VScannerSPtr = std::shared_ptr<VScanner>;
-using VScannerWPtr = std::weak_ptr<VScanner>;
} // namespace doris::vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]