This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new fbcf3380971 [refactor](scanner) refactoring and optimizing scanner
scheduling (#30746)
fbcf3380971 is described below
commit fbcf33809719140253a40c913c480f5d0f103830
Author: Ashin Gau <[email protected]>
AuthorDate: Wed Feb 7 18:08:24 2024 +0800
[refactor](scanner) refactoring and optimizing scanner scheduling (#30746)
---
be/src/pipeline/exec/file_scan_operator.cpp | 8 +-
be/src/pipeline/exec/scan_operator.cpp | 27 +-
be/src/pipeline/exec/scan_operator.h | 3 +-
be/src/runtime/runtime_state.h | 8 +-
be/src/vec/exec/scan/new_file_scan_node.cpp | 9 +-
be/src/vec/exec/scan/pip_scanner_context.h | 242 +---------
be/src/vec/exec/scan/scanner_context.cpp | 515 +++++++++------------
be/src/vec/exec/scan/scanner_context.h | 213 ++++-----
be/src/vec/exec/scan/scanner_scheduler.cpp | 297 ++++--------
be/src/vec/exec/scan/scanner_scheduler.h | 44 +-
be/src/vec/exec/scan/vscan_node.cpp | 25 +-
be/src/vec/exec/scan/vscan_node.h | 3 +-
.../java/org/apache/doris/qe/SessionVariable.java | 42 +-
gensrc/thrift/PaloInternalService.thrift | 2 +
14 files changed, 499 insertions(+), 939 deletions(-)
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp
b/be/src/pipeline/exec/file_scan_operator.cpp
index 51fa60f067d..ac193147dfb 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -38,8 +38,10 @@ Status
FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
}
auto& p = _parent->cast<FileScanOperatorX>();
- size_t shard_num =
- std::min<size_t>(config::doris_scanner_thread_pool_thread_num,
_scan_ranges.size());
+ size_t shard_num = std::min<size_t>(
+ config::doris_scanner_thread_pool_thread_num /
state()->query_parallel_instance_num(),
+ _scan_ranges.size());
+ shard_num = std::max(shard_num, (size_t)1);
_kv_cache.reset(new vectorized::ShardedKVCache(shard_num));
for (auto& scan_range : _scan_ranges) {
std::unique_ptr<vectorized::VFileScanner> scanner =
vectorized::VFileScanner::create_unique(
@@ -62,7 +64,7 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>&
scan_ranges) {
int max_scanners =
config::doris_scanner_thread_pool_thread_num /
state->query_parallel_instance_num();
- max_scanners = max_scanners == 0 ? 1 : max_scanners;
+ max_scanners = std::max(std::max(max_scanners,
state->parallel_scan_max_scanners_count()), 1);
// For select * from table limit 10; should just use one thread.
if (should_run_serial()) {
max_scanners = 1;
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index d9fced39b05..f19bed90a9e 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -160,7 +160,6 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {
if (_scanner_ctx) {
DCHECK(!_eos && _num_scanners->value() > 0);
RETURN_IF_ERROR(_scanner_ctx->init());
-
RETURN_IF_ERROR(state->exec_env()->scanner_scheduler()->submit(_scanner_ctx));
}
_opened = true;
return status;
@@ -1288,16 +1287,14 @@ Status ScanLocalState<Derived>::_init_profile() {
profile()->add_child(_scanner_profile.get(), true, nullptr);
_memory_usage_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_scanner_profile,
"MemoryUsage", 1);
- _queued_blocks_memory_usage = _scanner_profile->AddHighWaterMarkCounter(
- "QueuedBlocks", TUnit::BYTES, "MemoryUsage", 1);
_free_blocks_memory_usage =
_scanner_profile->AddHighWaterMarkCounter("FreeBlocks",
TUnit::BYTES, "MemoryUsage", 1);
_newly_create_free_blocks_num =
ADD_COUNTER(_scanner_profile, "NewlyCreateFreeBlocksNum",
TUnit::UNIT);
+ _scale_up_scanners_counter = ADD_COUNTER(_scanner_profile,
"NumScaleUpScanners", TUnit::UNIT);
// time of transfer thread to wait for block from scan thread
_scanner_wait_batch_timer = ADD_TIMER(_scanner_profile,
"ScannerBatchWaitTime");
_scanner_sched_counter = ADD_COUNTER(_scanner_profile,
"ScannerSchedCount", TUnit::UNIT);
- _scanner_ctx_sched_counter = ADD_COUNTER(_scanner_profile,
"ScannerCtxSchedCount", TUnit::UNIT);
_scanner_ctx_sched_time = ADD_TIMER(_scanner_profile,
"ScannerCtxSchedTime");
_scan_timer = ADD_TIMER(_scanner_profile, "ScannerGetBlockTime");
@@ -1456,14 +1453,10 @@ Status
ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized:
}};
if (state->is_cancelled()) {
- // ISSUE: https://github.com/apache/doris/issues/16360
- // _scanner_ctx may be null here, see: `VScanNode::alloc_resource`
(_eos == null)
if (local_state._scanner_ctx) {
-
local_state._scanner_ctx->set_status_on_error(Status::Cancelled("query
cancelled"));
- return local_state._scanner_ctx->status();
- } else {
- return Status::Cancelled("query cancelled");
+ local_state._scanner_ctx->stop_scanners(state);
}
+ return Status::Cancelled("Query cancelled in ScanOperator");
}
if (local_state._eos) {
@@ -1471,21 +1464,11 @@ Status
ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized:
return Status::OK();
}
- vectorized::BlockUPtr scan_block = nullptr;
bool eos = false;
- RETURN_IF_ERROR(local_state._scanner_ctx->get_block_from_queue(state,
&scan_block, &eos, 0));
- if (eos) {
- source_state = SourceState::FINISHED;
- DCHECK(scan_block == nullptr);
- return Status::OK();
- }
-
- // get scanner's block memory
- block->swap(*scan_block);
- local_state._scanner_ctx->return_free_block(std::move(scan_block));
+ RETURN_IF_ERROR(local_state._scanner_ctx->get_block_from_queue(state,
block, &eos, 0));
local_state.reached_limit(block, source_state);
- if (eos) {
+ if (eos || source_state == SourceState::FINISHED) {
source_state = SourceState::FINISHED;
// reach limit, stop the scanners.
local_state._scanner_ctx->stop_scanners(state);
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index 8fac0b946ea..add2249276f 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -142,7 +142,6 @@ protected:
std::shared_ptr<RuntimeProfile> _scanner_profile;
RuntimeProfile::Counter* _scanner_sched_counter = nullptr;
- RuntimeProfile::Counter* _scanner_ctx_sched_counter = nullptr;
RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr;
RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr;
RuntimeProfile::Counter* _scanner_wait_worker_timer = nullptr;
@@ -160,8 +159,8 @@ protected:
// time of filter output block from scanner
RuntimeProfile::Counter* _filter_timer = nullptr;
RuntimeProfile::Counter* _memory_usage_counter = nullptr;
- RuntimeProfile::HighWaterMarkCounter* _queued_blocks_memory_usage =
nullptr;
RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage = nullptr;
+ RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr;
// rows read from the scanner (including those discarded by (pre)filters)
RuntimeProfile::Counter* _rows_read_counter = nullptr;
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 91443ef9492..38053d3cb68 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -128,7 +128,13 @@ public:
:
_query_options.query_timeout;
}
int max_io_buffers() const { return _query_options.max_io_buffers; }
- int num_scanner_threads() const { return
_query_options.num_scanner_threads; }
+ int num_scanner_threads() const {
+ return _query_options.__isset.num_scanner_threads ?
_query_options.num_scanner_threads : 0;
+ }
+ double scanner_scale_up_ratio() const {
+ return _query_options.__isset.scanner_scale_up_ratio ?
_query_options.scanner_scale_up_ratio
+ : 0;
+ }
TQueryType::type query_type() const { return _query_options.query_type; }
int64_t timestamp_ms() const { return _timestamp_ms; }
int32_t nano_seconds() const { return _nano_seconds; }
diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp
b/be/src/vec/exec/scan/new_file_scan_node.cpp
index da33538b8c3..2ce80f4463a 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_file_scan_node.cpp
@@ -62,7 +62,7 @@ void NewFileScanNode::set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>&
scan_ranges) {
int max_scanners =
config::doris_scanner_thread_pool_thread_num /
state->query_parallel_instance_num();
- max_scanners = max_scanners == 0 ? 1 : max_scanners;
+ max_scanners = std::max(std::max(max_scanners,
state->parallel_scan_max_scanners_count()), 1);
// For select * from table limit 10; should just use one thread.
if (should_run_serial()) {
max_scanners = 1;
@@ -116,9 +116,10 @@ Status
NewFileScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
return Status::OK();
}
- // TODO: determine kv cache shard num
- size_t shard_num =
- std::min<size_t>(config::doris_scanner_thread_pool_thread_num,
_scan_ranges.size());
+ size_t shard_num = std::min<size_t>(
+ config::doris_scanner_thread_pool_thread_num /
_state->query_parallel_instance_num(),
+ _scan_ranges.size());
+ shard_num = std::max(shard_num, (size_t)1);
_kv_cache.reset(new ShardedKVCache(shard_num));
for (auto& scan_range : _scan_ranges) {
std::unique_ptr<VFileScanner> scanner =
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h
b/be/src/vec/exec/scan/pip_scanner_context.h
index 62f6f9edb21..b69f7c031d4 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -36,129 +36,6 @@ public:
: vectorized::ScannerContext(state, parent, output_tuple_desc,
output_row_descriptor,
scanners, limit_,
max_bytes_in_blocks_queue,
num_parallel_instances) {}
-
- Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr*
block, bool* eos,
- int id) override {
- {
- std::unique_lock l(_transfer_lock);
- if (state->is_cancelled()) {
- set_status_on_error(Status::Cancelled("cancelled"), false);
- }
-
- if (!status().ok()) {
- return _process_status;
- }
- }
-
- std::vector<vectorized::BlockUPtr> merge_blocks;
- {
- std::unique_lock<std::mutex> l(*_queue_mutexs[id]);
- // The pipeline maybe wake up by scanner.done. If there are still
any data
- // in the queue, should read the data first and then check if the
scanner.done
- // if done, then eos is returned to indicate that the scan
operator finished.
- if (_blocks_queues[id].empty()) {
- *eos = done();
- return Status::OK();
- }
- if (_process_status.is<ErrorCode::CANCELLED>()) {
- *eos = true;
- return Status::OK();
- }
- *block = std::move(_blocks_queues[id].front());
- _blocks_queues[id].pop_front();
-
- auto rows = (*block)->rows();
- while (!_blocks_queues[id].empty()) {
- const auto add_rows = (*_blocks_queues[id].front()).rows();
- if (rows + add_rows < state->batch_size()) {
- rows += add_rows;
-
merge_blocks.emplace_back(std::move(_blocks_queues[id].front()));
- _blocks_queues[id].pop_front();
- } else {
- break;
- }
- }
-
- if (_blocks_queues[id].empty()) {
- this->reschedule_scanner_ctx();
- }
- }
-
- _current_used_bytes -= (*block)->allocated_bytes();
- if (!merge_blocks.empty()) {
- vectorized::MutableBlock m(block->get());
- for (auto& merge_block : merge_blocks) {
- _current_used_bytes -= merge_block->allocated_bytes();
- static_cast<void>(m.merge(*merge_block));
- return_free_block(std::move(merge_block));
- }
- (*block)->set_columns(std::move(m.mutable_columns()));
- }
-
- // after return free blocks, should try to reschedule the scanner
- if (should_be_scheduled()) {
- this->reschedule_scanner_ctx();
- }
-
- return Status::OK();
- }
-
- void append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks)
override {
- const int queue_size = _blocks_queues.size();
- const int block_size = blocks.size();
- if (block_size == 0) {
- return;
- }
- int64_t local_bytes = 0;
-
- for (const auto& block : blocks) {
- auto st = validate_block_schema(block.get());
- if (!st.ok()) {
- set_status_on_error(st, false);
- }
- local_bytes += block->allocated_bytes();
- }
-
- for (int i = 0; i < queue_size && i < block_size; ++i) {
- int queue = _next_queue_to_feed;
- {
- std::lock_guard<std::mutex> l(*_queue_mutexs[queue]);
- for (int j = i; j < block_size; j += queue_size) {
- _blocks_queues[queue].emplace_back(std::move(blocks[j]));
- }
- }
- _next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0;
- }
- _current_used_bytes += local_bytes;
- }
-
- bool empty_in_queue(int id) override {
- std::unique_lock<std::mutex> l(*_queue_mutexs[id]);
- return _blocks_queues[id].empty();
- }
-
- Status init() override {
- for (int i = 0; i < _num_parallel_instances; ++i) {
- _queue_mutexs.emplace_back(std::make_unique<std::mutex>());
- _blocks_queues.emplace_back(std::list<vectorized::BlockUPtr>());
- }
- return ScannerContext::init();
- }
-
- std::string debug_string() override {
- auto res = ScannerContext::debug_string();
- for (int i = 0; i < _blocks_queues.size(); ++i) {
- res += " queue " + std::to_string(i) + ":size " +
- std::to_string(_blocks_queues[i].size());
- }
- return res;
- }
-
-protected:
- int _next_queue_to_feed = 0;
- std::vector<std::unique_ptr<std::mutex>> _queue_mutexs;
- std::vector<std::list<vectorized::BlockUPtr>> _blocks_queues;
- std::atomic_int64_t _current_used_bytes = 0;
};
class PipXScannerContext final : public vectorized::ScannerContext {
@@ -172,117 +49,38 @@ public:
int64_t limit_, int64_t max_bytes_in_blocks_queue,
std::shared_ptr<pipeline::ScanDependency> dependency)
: vectorized::ScannerContext(state, output_tuple_desc,
output_row_descriptor, scanners,
- limit_, max_bytes_in_blocks_queue, 1,
local_state,
- dependency) {}
- Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr*
block, bool* eos,
- int id) override {
- if (_blocks_queue_buffered.empty()) {
- std::unique_lock l(_transfer_lock);
- if (state->is_cancelled()) {
- set_status_on_error(Status::Cancelled("cancelled"), false);
- }
-
- if (!status().ok()) {
- return _process_status;
- }
-
- if (_blocks_queue.empty()) {
- *eos = done();
- return Status::OK();
- }
- if (_process_status.is<ErrorCode::CANCELLED>()) {
- *eos = true;
- return Status::OK();
- }
-
- _blocks_queue_buffered = std::move(_blocks_queue);
- }
-
- // `get_block_from_queue` should not be called concurrently from
multiple threads,
- // so here no need to lock.
- *block = std::move(_blocks_queue_buffered.front());
- _blocks_queue_buffered.pop_front();
+ limit_, max_bytes_in_blocks_queue, 1,
local_state) {
+ _dependency = dependency;
+ }
- std::vector<vectorized::BlockUPtr> merge_blocks;
- auto rows = (*block)->rows();
- while (!_blocks_queue_buffered.empty()) {
- const auto add_rows = (*_blocks_queue_buffered.front()).rows();
- if (rows + add_rows < state->batch_size()) {
- rows += add_rows;
-
merge_blocks.emplace_back(std::move(_blocks_queue_buffered.front()));
- _blocks_queue_buffered.pop_front();
- } else {
- break;
- }
+ void append_block_to_queue(std::shared_ptr<vectorized::ScanTask>
scan_task) override {
+ vectorized::ScannerContext::append_block_to_queue(scan_task);
+ if (_dependency) {
+ _dependency->set_ready();
}
+ }
- if (_blocks_queue_buffered.empty()) {
- std::unique_lock l(_transfer_lock);
- if (_blocks_queue.empty()) {
- this->reschedule_scanner_ctx();
+ Status get_block_from_queue(RuntimeState* state, vectorized::Block* block,
bool* eos, int id,
+ bool wait = true) override {
+ Status st = vectorized::ScannerContext::get_block_from_queue(state,
block, eos, id, wait);
+ std::lock_guard<std::mutex> l(_transfer_lock);
+ if (_blocks_queue.empty()) {
+ if (_dependency) {
_dependency->block();
- } else {
- _blocks_queue_buffered = std::move(_blocks_queue);
- }
- }
-
- _cur_bytes_in_queue -= (*block)->allocated_bytes();
- if (!merge_blocks.empty()) {
- vectorized::MutableBlock m(block->get());
- for (auto& merge_block : merge_blocks) {
- _cur_bytes_in_queue -= merge_block->allocated_bytes();
- static_cast<void>(m.merge(*merge_block));
- if (merge_block->mem_reuse()) {
- _free_blocks_buffered.emplace_back(std::move(merge_block));
- }
}
- (*block)->set_columns(std::move(m.mutable_columns()));
- }
- return_free_blocks();
-
- // after return free blocks, should try to reschedule the scanner
- if (should_be_scheduled()) {
- this->reschedule_scanner_ctx();
}
-
- return Status::OK();
+ return st;
}
- void reschedule_scanner_ctx() override {
- if (done()) {
- return;
- }
- auto state = _scanner_scheduler->submit(shared_from_this());
- //todo(wb) rethinking is it better to mark current scan_context failed
when submit failed many times?
- if (state.ok()) {
- _num_scheduling_ctx++;
- } else {
- set_status_on_error(state, false);
+protected:
+ void _set_scanner_done() override {
+ if (_dependency) {
+ _dependency->set_scanner_done();
}
}
private:
- void return_free_blocks() {
- if (_free_blocks_buffered.empty()) {
- return;
- }
-
- size_t total_bytes = 0;
- for (auto& block : _free_blocks_buffered) {
- const auto bytes = block->allocated_bytes();
- block->clear_column_data();
- _estimated_block_bytes = std::max(bytes, (size_t)16);
- total_bytes += bytes;
- }
- _free_blocks_memory_usage->add(total_bytes);
- const auto count = _free_blocks_buffered.size();
-
_free_blocks.enqueue_bulk(std::make_move_iterator(_free_blocks_buffered.begin()),
count);
- _free_blocks_buffered.clear();
- _serving_blocks_num -= count;
- }
-
- std::vector<vectorized::BlockUPtr> _free_blocks_buffered;
- std::list<vectorized::BlockUPtr> _blocks_queue_buffered;
+ std::shared_ptr<pipeline::ScanDependency> _dependency = nullptr;
};
} // namespace doris::pipeline
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index be143b9f729..45e934ee790 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -17,12 +17,10 @@
#include "scanner_context.h"
-#include <bthread/bthread.h>
#include <fmt/format.h>
#include <gen_cpp/Metrics_types.h>
#include <glog/logging.h>
-#include <algorithm>
#include <mutex>
#include <ostream>
#include <utility>
@@ -31,64 +29,57 @@
#include "common/status.h"
#include "pipeline/exec/scan_operator.h"
#include "runtime/descriptors.h"
-#include "runtime/exec_env.h"
-#include "runtime/query_context.h"
#include "runtime/runtime_state.h"
-#include "util/pretty_printer.h"
#include "util/uid_util.h"
#include "vec/core/block.h"
-#include "vec/exec/scan/scanner_scheduler.h"
#include "vec/exec/scan/vscan_node.h"
-#include "vec/exec/scan/vscanner.h"
namespace doris::vectorized {
using namespace std::chrono_literals;
-static bvar::Status<int64_t>
g_bytes_in_scanner_queue("doris_bytes_in_scanner_queue", 0);
-static bvar::Status<int64_t>
g_num_running_scanners("doris_num_running_scanners", 0);
-
ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor*
output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const
std::list<std::shared_ptr<ScannerDelegate>>& scanners,
int64_t limit_, int64_t
max_bytes_in_blocks_queue,
const int num_parallel_instances,
- pipeline::ScanLocalStateBase* local_state,
- std::shared_ptr<pipeline::ScanDependency>
dependency)
+ pipeline::ScanLocalStateBase* local_state)
: HasTaskExecutionCtx(state),
_state(state),
- _parent(nullptr),
_local_state(local_state),
_output_tuple_desc(output_row_descriptor
?
output_row_descriptor->tuple_descriptors().front()
: output_tuple_desc),
_output_row_descriptor(output_row_descriptor),
- _process_status(Status::OK()),
_batch_size(state->batch_size()),
limit(limit_),
_max_bytes_in_queue(std::max(max_bytes_in_blocks_queue,
(int64_t)1024) *
num_parallel_instances),
_scanner_scheduler(state->exec_env()->scanner_scheduler()),
- _scanners(scanners.begin(), scanners.end()),
_all_scanners(scanners.begin(), scanners.end()),
- _num_parallel_instances(num_parallel_instances),
- _dependency(dependency) {
+ _num_parallel_instances(num_parallel_instances) {
DCHECK(_output_row_descriptor == nullptr ||
_output_row_descriptor->tuple_descriptors().size() == 1);
_query_id = _state->get_query_ctx()->query_id();
ctx_id = UniqueId::gen_uid().to_string();
- if (_scanners.empty()) {
+ // Provide more memory for wide tables, increase proportionally by
multiples of 300
+ _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1;
+ if (scanners.empty()) {
_is_finished = true;
_set_scanner_done();
}
+ _scanners.enqueue_bulk(scanners.begin(), scanners.size());
if (limit < 0) {
limit = -1;
}
- _max_thread_num = config::doris_scanner_thread_pool_thread_num / 4;
+ MAX_SCALE_UP_RATIO = _state->scanner_scale_up_ratio();
+ _max_thread_num = _state->num_scanner_threads() > 0
+ ? _state->num_scanner_threads()
+ : config::doris_scanner_thread_pool_thread_num /
+ state->query_parallel_instance_num();
_max_thread_num *= num_parallel_instances;
_max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
- DCHECK(_max_thread_num > 0);
- _max_thread_num = std::min(_max_thread_num, (int32_t)_scanners.size());
+ _max_thread_num = std::min(_max_thread_num, (int32_t)scanners.size());
// 1. Calculate max concurrency
// For select * from table limit 10; should just use one thread.
if ((_parent && _parent->should_run_serial()) ||
@@ -104,45 +95,9 @@ ScannerContext::ScannerContext(doris::RuntimeState* state,
doris::vectorized::VS
int64_t limit_, int64_t
max_bytes_in_blocks_queue,
const int num_parallel_instances,
pipeline::ScanLocalStateBase* local_state)
- : HasTaskExecutionCtx(state),
- _state(state),
- _parent(parent),
- _local_state(local_state),
- _output_tuple_desc(output_row_descriptor
- ?
output_row_descriptor->tuple_descriptors().front()
- : output_tuple_desc),
- _output_row_descriptor(output_row_descriptor),
- _process_status(Status::OK()),
- _batch_size(state->batch_size()),
- limit(limit_),
- _max_bytes_in_queue(std::max(max_bytes_in_blocks_queue,
(int64_t)1024) *
- num_parallel_instances),
- _scanner_scheduler(state->exec_env()->scanner_scheduler()),
- _scanners(scanners.begin(), scanners.end()),
- _all_scanners(scanners.begin(), scanners.end()),
- _num_parallel_instances(num_parallel_instances) {
- DCHECK(_output_row_descriptor == nullptr ||
- _output_row_descriptor->tuple_descriptors().size() == 1);
- _query_id = _state->get_query_ctx()->query_id();
- ctx_id = UniqueId::gen_uid().to_string();
- if (_scanners.empty()) {
- _is_finished = true;
- _set_scanner_done();
- }
- if (limit < 0) {
- limit = -1;
- }
- _max_thread_num = config::doris_scanner_thread_pool_thread_num / 4;
- _max_thread_num *= num_parallel_instances;
- _max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
- DCHECK(_max_thread_num > 0);
- _max_thread_num = std::min(_max_thread_num, (int32_t)_scanners.size());
- // 1. Calculate max concurrency
- // For select * from table limit 10; should just use one thread.
- if ((_parent && _parent->should_run_serial()) ||
- (_local_state && _local_state->should_run_serial())) {
- _max_thread_num = 1;
- }
+ : ScannerContext(state, output_tuple_desc, output_row_descriptor,
scanners, limit_,
+ max_bytes_in_blocks_queue, num_parallel_instances,
local_state) {
+ _parent = parent;
}
// After init function call, should not access _parent
@@ -150,43 +105,21 @@ Status ScannerContext::init() {
if (_parent) {
_scanner_profile = _parent->_scanner_profile;
_scanner_sched_counter = _parent->_scanner_sched_counter;
- _scanner_ctx_sched_counter = _parent->_scanner_ctx_sched_counter;
- _scanner_ctx_sched_time = _parent->_scanner_ctx_sched_time;
- _free_blocks_memory_usage = _parent->_free_blocks_memory_usage;
_newly_create_free_blocks_num = _parent->_newly_create_free_blocks_num;
- _queued_blocks_memory_usage = _parent->_queued_blocks_memory_usage;
_scanner_wait_batch_timer = _parent->_scanner_wait_batch_timer;
+ _free_blocks_memory_usage_mark = _parent->_free_blocks_memory_usage;
+ _scanner_ctx_sched_time = _parent->_scanner_ctx_sched_time;
+ _scale_up_scanners_counter = _parent->_scale_up_scanners_counter;
} else {
_scanner_profile = _local_state->_scanner_profile;
_scanner_sched_counter = _local_state->_scanner_sched_counter;
- _scanner_ctx_sched_counter = _local_state->_scanner_ctx_sched_counter;
- _scanner_ctx_sched_time = _local_state->_scanner_ctx_sched_time;
- _free_blocks_memory_usage = _local_state->_free_blocks_memory_usage;
_newly_create_free_blocks_num =
_local_state->_newly_create_free_blocks_num;
- _queued_blocks_memory_usage =
_local_state->_queued_blocks_memory_usage;
_scanner_wait_batch_timer = _local_state->_scanner_wait_batch_timer;
+ _free_blocks_memory_usage_mark =
_local_state->_free_blocks_memory_usage;
+ _scanner_ctx_sched_time = _local_state->_scanner_ctx_sched_time;
+ _scale_up_scanners_counter = _local_state->_scale_up_scanners_counter;
}
- // 2. Calculate the number of free blocks that all scanners can use.
- // The calculation logic is as follows:
- // 1. Assuming that at most M rows can be scanned in one
scan(config::doris_scanner_row_num),
- // then figure out how many blocks are required for one
scan(_block_per_scanner).
- // 2. The maximum number of concurrency * the blocks required for one
scan,
- // that is, the number of blocks that all scanners can use.
- auto doris_scanner_row_num =
- limit == -1 ? config::doris_scanner_row_num
- :
std::min(static_cast<int64_t>(config::doris_scanner_row_num), limit);
- int real_block_size =
- limit == -1 ? _batch_size :
std::min(static_cast<int64_t>(_batch_size), limit);
- _block_per_scanner = (doris_scanner_row_num + (real_block_size - 1)) /
real_block_size;
- _free_blocks_capacity = _max_thread_num * _block_per_scanner;
- auto block = get_free_block();
- _estimated_block_bytes = std::max(block->allocated_bytes(), (size_t)16);
- int min_blocks = (config::min_bytes_in_scanner_queue +
_estimated_block_bytes - 1) /
- _estimated_block_bytes;
- _free_blocks_capacity = std::max(_free_blocks_capacity, min_blocks);
- return_free_block(std::move(block));
-
#ifndef BE_TEST
// 3. get thread token
if (_state->get_query_ctx()) {
@@ -198,8 +131,6 @@ Status ScannerContext::init() {
}
#endif
- _num_unfinished_scanners = _scanners.size();
-
if (_parent) {
COUNTER_SET(_parent->_max_scanner_thread_num,
(int64_t)_max_thread_num);
_parent->_runtime_profile->add_info_string("UseSpecificThreadToken",
@@ -210,6 +141,17 @@ Status ScannerContext::init() {
thread_token ==
nullptr ? "False" : "True");
}
+ // submit `_max_thread_num` running scanners to `ScannerScheduler`
+ // When a running scanners is finished, it will submit one of the
remaining scanners.
+ for (int i = 0; i < _max_thread_num; ++i) {
+ std::weak_ptr<ScannerDelegate> next_scanner;
+ if (_scanners.try_dequeue(next_scanner)) {
+ vectorized::BlockUPtr block = get_free_block();
+ submit_scan_task(std::make_shared<ScanTask>(next_scanner,
std::move(block)));
+ _num_running_scanners++;
+ }
+ }
+
return Status::OK();
}
@@ -220,138 +162,221 @@ std::string ScannerContext::parent_name() {
vectorized::BlockUPtr ScannerContext::get_free_block() {
vectorized::BlockUPtr block;
if (_free_blocks.try_dequeue(block)) {
+ std::lock_guard<std::mutex> fl(_free_blocks_lock);
DCHECK(block->mem_reuse());
- _free_blocks_memory_usage->add(-block->allocated_bytes());
- _serving_blocks_num++;
+ _free_blocks_memory_usage -= block->allocated_bytes();
+ _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
return block;
}
- block = vectorized::Block::create_unique(_output_tuple_desc->slots(),
_batch_size,
- true /*ignore invalid slots*/);
-
- COUNTER_UPDATE(_newly_create_free_blocks_num, 1);
-
- _serving_blocks_num++;
- return block;
+ _newly_create_free_blocks_num->update(1);
+ return vectorized::Block::create_unique(_output_tuple_desc->slots(),
_batch_size,
+ true /*ignore invalid slots*/);
}
-void ScannerContext::return_free_block(std::unique_ptr<vectorized::Block>
block) {
- _serving_blocks_num--;
- if (block->mem_reuse()) {
- // Only put blocks with schema to free blocks, because colocate blocks
- // need schema.
- _estimated_block_bytes = std::max(block->allocated_bytes(),
(size_t)16);
+void ScannerContext::return_free_block(vectorized::BlockUPtr block) {
+ std::lock_guard<std::mutex> fl(_free_blocks_lock);
+ if (block->mem_reuse() && _free_blocks_memory_usage < _max_bytes_in_queue)
{
block->clear_column_data();
- _free_blocks_memory_usage->add(block->allocated_bytes());
+ _free_blocks_memory_usage += block->allocated_bytes();
+ _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
_free_blocks.enqueue(std::move(block));
}
}
-void
ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>&
blocks) {
- std::lock_guard l(_transfer_lock);
- auto old_bytes_in_queue = _cur_bytes_in_queue;
- for (auto& b : blocks) {
- auto st = validate_block_schema(b.get());
+bool ScannerContext::empty_in_queue(int id) {
+ std::lock_guard<std::mutex> l(_transfer_lock);
+ return _blocks_queue.empty();
+}
+
+void ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task) {
+ _scanner_sched_counter->update(1);
+ _num_scheduled_scanners++;
+ _scanner_scheduler->submit(shared_from_this(), scan_task);
+}
+
+void ScannerContext::append_block_to_queue(std::shared_ptr<ScanTask>
scan_task) {
+ if (scan_task->status_ok() && scan_task->current_block->rows() > 0) {
+ Status st = validate_block_schema(scan_task->current_block.get());
if (!st.ok()) {
- set_status_on_error(st, false);
+ scan_task->set_status(st);
}
- _cur_bytes_in_queue += b->allocated_bytes();
- _blocks_queue.push_back(std::move(b));
}
- blocks.clear();
- if (_dependency) {
- _dependency->set_ready();
+ std::lock_guard<std::mutex> l(_transfer_lock);
+ if (!scan_task->status_ok()) {
+ _process_status = scan_task->get_status();
+ }
+ if (_last_scale_up_time == 0) {
+ _last_scale_up_time = UnixMillis();
+ }
+ if (_blocks_queue.empty() && _last_fetch_time != 0) {
+ // there's no block in queue before current block, so the consumer is
waiting
+ _total_wait_block_time += UnixMillis() - _last_fetch_time;
}
+ _num_scheduled_scanners--;
+ _blocks_queue.emplace_back(scan_task);
_blocks_queue_added_cv.notify_one();
- _queued_blocks_memory_usage->add(_cur_bytes_in_queue - old_bytes_in_queue);
- g_bytes_in_scanner_queue.set_value(_cur_bytes_in_queue);
}
-bool ScannerContext::empty_in_queue(int id) {
+Status ScannerContext::get_block_from_queue(RuntimeState* state,
vectorized::Block* block,
+ bool* eos, int id, bool wait) {
+ if (state->is_cancelled()) {
+ _set_scanner_done();
+ return Status::Cancelled("Query cancelled in ScannerContext");
+ }
std::unique_lock l(_transfer_lock);
- return _blocks_queue.empty();
-}
+ // Wait for block from queue
+ if (wait) {
+ // scanner batch wait time
+ SCOPED_TIMER(_scanner_wait_batch_timer);
+ while (!done() && _blocks_queue.empty() && _process_status.ok()) {
+ _blocks_queue_added_cv.wait_for(l, 1s);
+ }
+ }
+ if (!_process_status.ok()) {
+ _set_scanner_done();
+ return _process_status;
+ }
+ std::shared_ptr<ScanTask> scan_task = nullptr;
+ if (!_blocks_queue.empty() && !done()) {
+ _last_fetch_time = UnixMillis();
+ scan_task = _blocks_queue.front();
+ _blocks_queue.pop_front();
+ }
-Status ScannerContext::get_block_from_queue(RuntimeState* state,
vectorized::BlockUPtr* block,
- bool* eos, int id) {
- std::vector<vectorized::BlockUPtr> merge_blocks;
- {
- std::unique_lock l(_transfer_lock);
- // Normally, the scanner scheduler will schedule ctx.
- // But when the amount of data in the blocks queue exceeds the upper
limit,
- // the scheduler will stop scheduling.
- // (if the scheduler continues to schedule, it will cause a lot of
busy running).
- // At this point, consumers are required to trigger new scheduling to
ensure that
- // data can be continuously fetched.
- bool to_be_schedule = should_be_scheduled();
-
- bool is_scheduled = false;
- if (!done() && to_be_schedule && _num_running_scanners == 0) {
- is_scheduled = true;
- auto submit_status =
_scanner_scheduler->submit(shared_from_this());
- if (!submit_status.ok()) {
- set_status_on_error(submit_status, false);
+ if (scan_task) {
+ if (!scan_task->status_ok()) {
+ _set_scanner_done();
+ return scan_task->get_status();
+ }
+ // We can only know the block size after reading at least one block
+ // Just take the size of first block as `_estimated_block_size`
+ if (scan_task->first_block) {
+ std::lock_guard<std::mutex> fl(_free_blocks_lock);
+ size_t block_size = scan_task->current_block->allocated_bytes();
+ _free_blocks_memory_usage += block_size;
+ _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
+ scan_task->first_block = false;
+ if (block_size > _estimated_block_size) {
+ _estimated_block_size = block_size;
}
}
-
- // Wait for block from queue
- {
- SCOPED_TIMER(_scanner_wait_batch_timer);
- // scanner batch wait time
- while (!(!_blocks_queue.empty() || done() || !status().ok() ||
state->is_cancelled())) {
- if (!is_scheduled && _num_running_scanners == 0 &&
should_be_scheduled()) {
- LOG(INFO) << debug_string();
+ // consume current block
+ block->swap(*scan_task->current_block);
+ if (!scan_task->current_block->mem_reuse()) {
+ // it depends on the memory strategy of ScanNode/ScanOperator
+ // we should double check `mem_reuse()` of `current_block` to make
sure it can be reused
+ _newly_create_free_blocks_num->update(1);
+ scan_task->current_block =
vectorized::Block::create_unique(_output_tuple_desc->slots(),
+
_batch_size, true);
+ }
+ if (scan_task->is_eos()) { // current scanner is finished, and no more
data to read
+ _num_finished_scanners++;
+ std::weak_ptr<ScannerDelegate> next_scanner;
+ // submit one of the remaining scanners
+ if (_scanners.try_dequeue(next_scanner)) {
+ // reuse current running scanner, just reset some states.
+ scan_task->reuse_scanner(next_scanner);
+ submit_scan_task(scan_task);
+ } else {
+ // no more scanner to be scheduled
+ // `_free_blocks` serve all running scanners, maybe it's too
large for the remaining scanners
+ int free_blocks_for_each = _free_blocks.size_approx() /
_num_running_scanners;
+ _num_running_scanners--;
+ std::lock_guard<std::mutex> fl(_free_blocks_lock);
+ for (int i = 0; i < free_blocks_for_each; ++i) {
+ vectorized::BlockUPtr removed_block;
+ if (_free_blocks.try_dequeue(removed_block)) {
+ _free_blocks_memory_usage -= block->allocated_bytes();
+
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
+ }
}
- _blocks_queue_added_cv.wait_for(l, 1s);
}
+ } else {
+ // resubmit current running scanner to read the next block
+ submit_scan_task(scan_task);
}
+ // scale up
+ _try_to_scale_up();
+ }
- if (state->is_cancelled()) {
- set_status_on_error(Status::Cancelled("cancelled"), false);
- }
+ if (_num_finished_scanners == _all_scanners.size() &&
_blocks_queue.empty()) {
+ _set_scanner_done();
+ _is_finished = true;
+ }
+ *eos = done();
+ return Status::OK();
+}
- if (!status().ok()) {
- return status();
+void ScannerContext::_try_to_scale_up() {
+ // Four criteria to determine whether to increase the parallelism of the
scanners
+ // 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up
+ // 2. Half(`WAIT_BLOCK_DURATION_RATIO`) of the duration is waiting to get
blocks
+ // 3. `_free_blocks_memory_usage` < `_max_bytes_in_queue`, remains enough
memory to scale up
+ // 4. At most scale up `MAX_SCALE_UP_RATIO` times to `_max_thread_num`
+ if (MAX_SCALE_UP_RATIO > 0 && _scanners.size_approx() > 0 &&
+ (_num_running_scanners < _max_thread_num * MAX_SCALE_UP_RATIO) &&
+ (_last_fetch_time - _last_scale_up_time > SCALE_UP_DURATION) && //
duration > 5000ms
+ (_total_wait_block_time > (_last_fetch_time - _last_scale_up_time) *
+ WAIT_BLOCK_DURATION_RATIO)) { // too
large lock time
+ double wait_ratio =
+ (double)_total_wait_block_time / (_last_fetch_time -
_last_scale_up_time);
+ if (_last_wait_duration_ratio > 0 && wait_ratio >
_last_wait_duration_ratio * 0.8) {
+ // when _last_wait_duration_ratio > 0, it has scaled up before.
+ // we need to determine if the scale-up is effective:
+ // the wait duration ratio after last scaling up should less than
80% of `_last_wait_duration_ratio`
+ return;
}
- if (!_blocks_queue.empty()) {
- *block = std::move(_blocks_queue.front());
- _blocks_queue.pop_front();
- auto block_bytes = (*block)->allocated_bytes();
- _cur_bytes_in_queue -= block_bytes;
- _queued_blocks_memory_usage->add(-block_bytes);
-
- auto rows = (*block)->rows();
- while (!_blocks_queue.empty()) {
- auto& add_block = _blocks_queue.front();
- const auto add_rows = (*add_block).rows();
- if (rows + add_rows < state->batch_size()) {
- rows += add_rows;
- block_bytes = (*add_block).allocated_bytes();
- _cur_bytes_in_queue -= block_bytes;
- _queued_blocks_memory_usage->add(-block_bytes);
- merge_blocks.emplace_back(std::move(add_block));
- _blocks_queue.pop_front();
+ std::lock_guard<std::mutex> fl(_free_blocks_lock);
+ bool is_scale_up = false;
+ // calculate the number of scanners that can be scheduled
+ int num_add = std::min(_num_running_scanners * SCALE_UP_RATIO,
+ _max_thread_num * MAX_SCALE_UP_RATIO -
_num_running_scanners);
+ num_add = std::max(num_add, 1);
+ for (int i = 0; i < num_add; ++i) {
+ vectorized::BlockUPtr allocate_block = nullptr;
+ // reuse block in `_free_blocks` firstly
+ if (!_free_blocks.try_dequeue(allocate_block)) {
+ if (_free_blocks_memory_usage < _max_bytes_in_queue) {
+ _newly_create_free_blocks_num->update(1);
+ allocate_block =
vectorized::Block::create_unique(_output_tuple_desc->slots(),
+
_batch_size, true);
+ }
+ } else {
+ // comes from `_free_blocks`, decrease first, then will be
added back.
+ _free_blocks_memory_usage -= allocate_block->allocated_bytes();
+ _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
+ }
+ if (allocate_block) {
+ // get enough memory to launch one more scanner.
+ std::weak_ptr<ScannerDelegate> scale_up_scanner;
+ if (_scanners.try_dequeue(scale_up_scanner)) {
+ std::shared_ptr<ScanTask> scale_up_task =
+ std::make_shared<ScanTask>(scale_up_scanner,
std::move(allocate_block));
+ _free_blocks_memory_usage += _estimated_block_size;
+
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
+ // `first_block` is used to update
`_free_blocks_memory_usage`,
+ // we have got the `_estimated_block_size`, no need for
further updates
+ scale_up_task->first_block = false;
+ submit_scan_task(scale_up_task);
+ _num_running_scanners++;
+ _scale_up_scanners_counter->update(1);
+ is_scale_up = true;
} else {
break;
}
+ } else {
+ break;
}
- } else {
- *eos = done();
}
- }
- g_bytes_in_scanner_queue.set_value(_cur_bytes_in_queue);
- if (!merge_blocks.empty()) {
- vectorized::MutableBlock m(block->get());
- for (auto& merge_block : merge_blocks) {
- static_cast<void>(m.merge(*merge_block));
- return_free_block(std::move(merge_block));
+ if (is_scale_up) {
+ _last_wait_duration_ratio = wait_ratio;
+ _last_scale_up_time = UnixMillis();
+ _total_wait_block_time = 0;
}
- (*block)->set_columns(std::move(m.mutable_columns()));
}
-
- return Status::OK();
}
Status ScannerContext::validate_block_schema(Block* block) {
@@ -380,29 +405,17 @@ Status ScannerContext::validate_block_schema(Block*
block) {
return Status::OK();
}
-void ScannerContext::inc_num_running_scanners(int32_t inc) {
- std::lock_guard l(_transfer_lock);
- _num_running_scanners += inc;
- g_num_running_scanners.set_value(_num_running_scanners);
-}
-
-void ScannerContext::set_status_on_error(const Status& status, bool need_lock)
{
- std::unique_lock l(_transfer_lock, std::defer_lock);
- if (need_lock) {
- l.lock();
- }
- if (this->status().ok()) {
- _process_status = status;
- _blocks_queue_added_cv.notify_one();
- _should_stop = true;
- _set_scanner_done();
- LOG(INFO) << "ctx is set status on error " << debug_string()
- << ", call stack is: " << Status::InternalError<true>("catch
error status");
- }
+void ScannerContext::set_status_on_error(const Status& status) {
+ std::lock_guard<std::mutex> l(_transfer_lock);
+ _process_status = status;
+ _blocks_queue_added_cv.notify_one();
}
void ScannerContext::stop_scanners(RuntimeState* state) {
- std::unique_lock l(_transfer_lock);
+ std::lock_guard<std::mutex> l(_transfer_lock);
+ if (_should_stop) {
+ return;
+ }
_should_stop = true;
_set_scanner_done();
for (const std::weak_ptr<ScannerDelegate>& scanner : _all_scanners) {
@@ -453,95 +466,15 @@ void ScannerContext::stop_scanners(RuntimeState* state) {
_blocks_queue_added_cv.notify_one();
}
-void ScannerContext::_set_scanner_done() {
- if (_dependency) {
- _dependency->set_scanner_done();
- }
-}
-
std::string ScannerContext::debug_string() {
return fmt::format(
- "id: {}, total scanners: {}, scanners: {}, blocks in queue: {},"
- " status: {}, _should_stop: {}, _is_finished: {}, free blocks: {},"
+ "id: {}, total scanners: {}, blocks in queue: {},"
+ " _should_stop: {}, _is_finished: {}, free blocks: {},"
" limit: {}, _num_running_scanners: {}, _max_thread_num: {},"
- " _block_per_scanner: {}, _cur_bytes_in_queue: {},
MAX_BYTE_OF_QUEUE: {}, "
- "num_ctx_scheduled: {}, serving_blocks_num: {},
allowed_blocks_num: {}, query_id: {}",
- ctx_id, _all_scanners.size(), _scanners.size(),
_blocks_queue.size(),
- _process_status.to_string(), _should_stop, _is_finished,
_free_blocks.size_approx(),
- limit, _num_running_scanners, _max_thread_num, _block_per_scanner,
_cur_bytes_in_queue,
- _max_bytes_in_queue, num_ctx_scheduled(), _serving_blocks_num,
allowed_blocks_num(),
- print_id(_query_id));
-}
-
-void ScannerContext::reschedule_scanner_ctx() {
- std::lock_guard l(_transfer_lock);
- if (done()) {
- return;
- }
- auto submit_status = _scanner_scheduler->submit(shared_from_this());
- //todo(wb) rethinking is it better to mark current scan_context failed
when submit failed many times?
- if (!submit_status.ok()) {
- set_status_on_error(submit_status, false);
- }
-}
-
-void
ScannerContext::push_back_scanner_and_reschedule(std::shared_ptr<ScannerDelegate>
scanner) {
- std::lock_guard l(_transfer_lock);
- // Use a transfer lock to avoid the scanner be scheduled concurrently. For
example, that after
- // calling "_scanners.push_front(scanner)", there may be other ctx in
scheduler
- // to schedule that scanner right away, and in that schedule run, the
scanner may be marked as closed
- // before we call the following if() block.
- {
- --_num_running_scanners;
- g_num_running_scanners.set_value(_num_running_scanners);
- if (scanner->_scanner->need_to_close()) {
- --_num_unfinished_scanners;
- if (_num_unfinished_scanners == 0) {
- _is_finished = true;
- _set_scanner_done();
- _blocks_queue_added_cv.notify_one();
- return;
- }
- } else {
- _scanners.push_front(scanner);
- }
- }
-
- if (should_be_scheduled()) {
- auto submit_status = _scanner_scheduler->submit(shared_from_this());
- if (!submit_status.ok()) {
- set_status_on_error(submit_status, false);
- }
- }
-}
-
-// This method is called in scanner scheduler, and task context is hold
-void ScannerContext::get_next_batch_of_scanners(
- std::list<std::weak_ptr<ScannerDelegate>>* current_run) {
- std::lock_guard l(_transfer_lock);
- // Update the sched counter for profile
- Defer defer {[&]() { _scanner_sched_counter->update(current_run->size());
}};
- // 1. Calculate how many scanners should be scheduled at this run.
- // If there are enough space in blocks queue,
- // the scanner number depends on the _free_blocks numbers
- int thread_slot_num = get_available_thread_slot_num();
-
- // 2. get #thread_slot_num scanners from ctx->scanners
- // and put them into "this_run".
- for (int i = 0; i < thread_slot_num && !_scanners.empty();) {
- std::weak_ptr<ScannerDelegate> scanner_ref = _scanners.front();
- std::shared_ptr<ScannerDelegate> scanner = scanner_ref.lock();
- _scanners.pop_front();
- if (scanner == nullptr) {
- continue;
- }
- if (scanner->_scanner->need_to_close()) {
- static_cast<void>(scanner->_scanner->close(_state));
- } else {
- current_run->push_back(scanner_ref);
- i++;
- }
- }
+ " _max_bytes_in_queue: {}, query_id: {}",
+ ctx_id, _all_scanners.size(), _blocks_queue.size(), _should_stop,
_is_finished,
+ _free_blocks.size_approx(), limit, _num_scheduled_scanners,
_max_thread_num,
+ _max_bytes_in_queue, print_id(_query_id));
}
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index 4d936d72a13..58e6f4dae7f 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -58,6 +58,47 @@ class VScanNode;
class ScannerScheduler;
class SimplifiedScanScheduler;
+class ScanTask {
+public:
+ ScanTask(std::weak_ptr<ScannerDelegate> delegate_scanner,
vectorized::BlockUPtr free_block)
+ : scanner(delegate_scanner), current_block(std::move(free_block))
{}
+
+private:
+ // whether current scanner is finished
+ bool eos = false;
+ Status status = Status::OK();
+
+public:
+ std::weak_ptr<ScannerDelegate> scanner;
+ // cache the block of current loop
+ vectorized::BlockUPtr current_block;
+ // only take the size of the first block as estimated size
+ bool first_block = true;
+ uint64_t last_submit_time; // nanoseconds
+
+ void set_status(Status _status) {
+ if (_status.is<ErrorCode::END_OF_FILE>()) {
+ // set `eos` if `END_OF_FILE`, don't take `END_OF_FILE` as error
+ eos = true;
+ }
+ status = _status;
+ }
+ Status get_status() const { return status; }
+ bool status_ok() { return status.ok() ||
status.is<ErrorCode::END_OF_FILE>(); }
+ bool is_eos() const { return eos; }
+ void set_eos(bool _eos) { eos = _eos; }
+
+ // reuse current running scanner
+ // reset `eos` and `status`
+ // `first_block` is used to update `_free_blocks_memory_usage`, and take
the first block size
+ // as the `_estimated_block_size`. It has updated
`_free_blocks_memory_usage`, so don't reset.
+ void reuse_scanner(std::weak_ptr<ScannerDelegate> next_scanner) {
+ scanner = next_scanner;
+ eos = false;
+ status = Status::OK();
+ }
+};
+
// ScannerContext is responsible for recording the execution status
// of a group of Scanners corresponding to a ScanNode.
// Including how many scanners are being scheduled, and maintaining
@@ -81,88 +122,49 @@ public:
virtual Status init();
vectorized::BlockUPtr get_free_block();
- void return_free_block(std::unique_ptr<vectorized::Block> block);
+ void return_free_block(vectorized::BlockUPtr block);
- // Append blocks from scanners to the blocks queue.
- virtual void append_blocks_to_queue(std::vector<vectorized::BlockUPtr>&
blocks);
- // Get next block from blocks queue. Called by ScanNode
+ // Get next block from blocks queue. Called by ScanNode/ScanOperator
// Set eos to true if there is no more data to read.
- // And if eos is true, the block returned must be nullptr.
- virtual Status get_block_from_queue(RuntimeState* state,
vectorized::BlockUPtr* block,
- bool* eos, int id);
+ virtual Status get_block_from_queue(RuntimeState* state,
vectorized::Block* block, bool* eos,
+ int id, bool wait = true);
[[nodiscard]] Status validate_block_schema(Block* block);
- // When a scanner complete a scan, this method will be called
- // to return the scanner to the list for next scheduling.
- void push_back_scanner_and_reschedule(std::shared_ptr<ScannerDelegate>
scanner);
+ // submit the running scanner to thread pool in `ScannerScheduler`
+ // set the next scanned block to `ScanTask::current_block`
+ // set the error state to `ScanTask::status`
+ // set the `eos` to `ScanTask::eos` if there is no more data in current
scanner
+ void submit_scan_task(std::shared_ptr<ScanTask> scan_task);
- void set_status_on_error(const Status& status, bool need_lock = true);
+ // append the running scanner and its cached block to `_blocks_queue`
+ virtual void append_block_to_queue(std::shared_ptr<ScanTask> scan_task);
- Status status() {
- if (_process_status.is<ErrorCode::END_OF_FILE>()) {
- return Status::OK();
- }
- return _process_status;
- }
+ void set_status_on_error(const Status& status);
// Return true if this ScannerContext need no more process
bool done() const { return _is_finished || _should_stop; }
bool is_finished() { return _is_finished.load(); }
bool should_stop() { return _should_stop.load(); }
- void inc_num_running_scanners(int32_t scanner_inc);
-
- int get_num_running_scanners() const { return _num_running_scanners; }
-
- int get_num_unfinished_scanners() const { return _num_unfinished_scanners;
}
-
- void get_next_batch_of_scanners(std::list<std::weak_ptr<ScannerDelegate>>*
current_run);
-
virtual std::string debug_string();
RuntimeState* state() { return _state; }
-
- void incr_num_ctx_scheduling(int64_t num) {
_scanner_ctx_sched_counter->update(num); }
-
- int64_t num_ctx_scheduled() { return _scanner_ctx_sched_counter->value(); }
void incr_ctx_scheduling_time(int64_t num) {
_scanner_ctx_sched_time->update(num); }
std::string parent_name();
virtual bool empty_in_queue(int id);
- // todo(wb) rethinking how to calculate ```_max_bytes_in_queue``` when
executing shared scan
- inline bool should_be_scheduled() const {
- return (_cur_bytes_in_queue < _max_bytes_in_queue / 2) &&
- (_serving_blocks_num < allowed_blocks_num());
- }
-
- int get_available_thread_slot_num() {
- int thread_slot_num = 0;
- thread_slot_num = (allowed_blocks_num() + _block_per_scanner - 1) /
_block_per_scanner;
- thread_slot_num = std::min(thread_slot_num, _max_thread_num -
_num_running_scanners);
- if (thread_slot_num <= 0) {
- thread_slot_num = 1;
- }
- return thread_slot_num;
- }
-
- int32_t allowed_blocks_num() const {
- int32_t blocks_num = std::min(_free_blocks_capacity,
- int32_t((_max_bytes_in_queue +
_estimated_block_bytes - 1) /
- _estimated_block_bytes));
- return blocks_num;
- }
-
SimplifiedScanScheduler* get_simple_scan_scheduler() { return
_simple_scan_scheduler; }
- virtual void reschedule_scanner_ctx();
void stop_scanners(RuntimeState* state);
int32_t get_max_thread_num() const { return _max_thread_num; }
void set_max_thread_num(int32_t num) { _max_thread_num = num; }
+ int batch_size() const { return _batch_size; }
+
// the unique id of this context
std::string ctx_id;
TUniqueId _query_id;
@@ -176,10 +178,16 @@ protected:
const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<ScannerDelegate>>&
scanners_, int64_t limit_,
int64_t max_bytes_in_blocks_queue_, const int
num_parallel_instances,
- pipeline::ScanLocalStateBase* local_state,
- std::shared_ptr<pipeline::ScanDependency> dependency);
+ pipeline::ScanLocalStateBase* local_state);
+
+ /// Four criteria to determine whether to increase the parallelism of the
scanners
+ /// 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up
+ /// 2. Half(`WAIT_BLOCK_DURATION_RATIO`) of the duration is waiting to get
blocks
+ /// 3. `_free_blocks_memory_usage` < `_max_bytes_in_queue`, remains enough
memory to scale up
+ /// 4. At most scale up `MAX_SCALE_UP_RATIO` times to `_max_thread_num`
+ virtual void _set_scanner_done() {};
- void _set_scanner_done();
+ void _try_to_scale_up();
RuntimeState* _state = nullptr;
VScanNode* _parent = nullptr;
@@ -189,97 +197,52 @@ protected:
const TupleDescriptor* _output_tuple_desc = nullptr;
const RowDescriptor* _output_row_descriptor = nullptr;
- // _transfer_lock is used to protect the critical section
- // where the ScanNode and ScannerScheduler interact.
- // Including access to variables such as blocks_queue, _process_status,
_is_finished, etc.
std::mutex _transfer_lock;
- // The blocks got from scanners will be added to the "blocks_queue".
- // And the upper scan node will be as a consumer to fetch blocks from this
queue.
- // Should be protected by "_transfer_lock"
- std::list<vectorized::BlockUPtr> _blocks_queue;
- // Wait in get_block_from_queue(), by ScanNode.
std::condition_variable _blocks_queue_added_cv;
- // Wait in clear_and_join(), by ScanNode.
- std::condition_variable _ctx_finish_cv;
-
- // The following 3 variables control the process of the scanner scheduling.
- // Use _transfer_lock to protect them.
- // 1. _process_status
- // indicates the global status of this scanner context.
- // Set to non-ok if encounter errors.
- // And if it is non-ok, the scanner process should stop.
- // Set be set by either ScanNode or ScannerScheduler.
- // 2. _should_stop
- // Always be set by ScanNode.
- // True means no more data need to be read(reach limit or closed)
- // 3. _is_finished
- // Always be set by ScannerScheduler.
- // True means all scanners are finished to scan.
- Status _process_status;
+ std::list<std::shared_ptr<ScanTask>> _blocks_queue;
+
+ Status _process_status = Status::OK();
std::atomic_bool _should_stop = false;
std::atomic_bool _is_finished = false;
// Lazy-allocated blocks for all scanners to share, for memory reuse.
moodycamel::ConcurrentQueue<vectorized::BlockUPtr> _free_blocks;
- std::atomic<int32_t> _serving_blocks_num = 0;
- // The current number of free blocks available to the scanners.
- // Used to limit the memory usage of the scanner.
- // NOTE: this is NOT the size of `_free_blocks`.
- int32_t _free_blocks_capacity = 0;
- int64_t _estimated_block_bytes = 0;
int _batch_size;
// The limit from SQL's limit clause
int64_t limit;
- // Current number of running scanners.
- std::atomic_int32_t _num_running_scanners = 0;
- // Current number of ctx being scheduled.
- // After each Scanner finishes a task, it will put the corresponding ctx
- // back into the scheduling queue.
- // Therefore, there will be multiple pointer of same ctx in the scheduling
queue.
- // Here we record the number of ctx in the scheduling queue to clean up
at the end.
- std::atomic_int32_t _num_scheduling_ctx = 0;
- // Num of unfinished scanners. Should be set in init()
- std::atomic_int32_t _num_unfinished_scanners = 0;
- // Max number of scan thread for this scanner context.
int32_t _max_thread_num = 0;
- // How many blocks a scanner can use in one task.
- int32_t _block_per_scanner = 0;
-
- // The current bytes of blocks in blocks queue
- int64_t _cur_bytes_in_queue = 0;
- // The max limit bytes of blocks in blocks queue
- const int64_t _max_bytes_in_queue;
-
+ int64_t _max_bytes_in_queue;
doris::vectorized::ScannerScheduler* _scanner_scheduler;
SimplifiedScanScheduler* _simple_scan_scheduler = nullptr;
- // List "scanners" saves all "unfinished" scanners.
- // The scanner scheduler will pop scanners from this list, run scanner,
- // and then if the scanner is not finished, will be pushed back to this
list.
- // Not need to protect by lock, because only one scheduler thread will
access to it.
- std::mutex _scanners_lock;
- // Scanner's ownership belong to vscannode or scanoperator, scanner
context does not own it.
- // ScannerContext has to check if scanner is deconstructed before use it.
- std::list<std::weak_ptr<ScannerDelegate>> _scanners;
+ moodycamel::ConcurrentQueue<std::weak_ptr<ScannerDelegate>> _scanners;
+ int32_t _num_scheduled_scanners = 0;
+ int32_t _num_finished_scanners = 0;
+ int32_t _num_running_scanners = 0;
// weak pointer for _scanners, used in stop function
std::vector<std::weak_ptr<ScannerDelegate>> _all_scanners;
- std::vector<int64_t> _finished_scanner_runtime;
- std::vector<int64_t> _finished_scanner_rows_read;
- std::vector<int64_t> _finished_scanner_wait_worker_time;
-
const int _num_parallel_instances;
-
std::shared_ptr<RuntimeProfile> _scanner_profile;
RuntimeProfile::Counter* _scanner_sched_counter = nullptr;
- RuntimeProfile::Counter* _scanner_ctx_sched_counter = nullptr;
- RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr;
- RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage = nullptr;
- RuntimeProfile::HighWaterMarkCounter* _queued_blocks_memory_usage =
nullptr;
RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr;
-
- std::shared_ptr<pipeline::ScanDependency> _dependency = nullptr;
+ RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage_mark =
nullptr;
+ RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr;
+ RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr;
+
+ // for scaling up the running scanners
+ std::mutex _free_blocks_lock;
+ size_t _estimated_block_size = 0;
+ int64_t _free_blocks_memory_usage = 0;
+ int64_t _last_scale_up_time = 0;
+ int64_t _last_fetch_time = 0;
+ int64_t _total_wait_block_time = 0;
+ double _last_wait_duration_ratio = 0;
+ const int64_t SCALE_UP_DURATION = 5000; // 5000ms
+ const float WAIT_BLOCK_DURATION_RATIO = 0.5;
+ const float SCALE_UP_RATIO = 0.5;
+ float MAX_SCALE_UP_RATIO;
};
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index d8678bc0dc3..40fff7ed70c 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -69,10 +69,6 @@ ScannerScheduler::~ScannerScheduler() {
return;
}
- for (int i = 0; i < QUEUE_NUM; i++) {
- delete _pending_queues[i];
- }
- delete[] _pending_queues;
_deregister_metrics();
}
@@ -81,18 +77,12 @@ void ScannerScheduler::stop() {
return;
}
- for (int i = 0; i < QUEUE_NUM; i++) {
- _pending_queues[i]->shutdown();
- }
-
_is_closed = true;
- _scheduler_pool->shutdown();
_local_scan_thread_pool->shutdown();
_remote_scan_thread_pool->shutdown();
_limited_scan_thread_pool->shutdown();
- _scheduler_pool->wait();
_local_scan_thread_pool->join();
_remote_scan_thread_pool->join();
_limited_scan_thread_pool->wait();
@@ -101,24 +91,12 @@ void ScannerScheduler::stop() {
}
Status ScannerScheduler::init(ExecEnv* env) {
- // 1. scheduling thread pool and scheduling queues
- static_cast<void>(ThreadPoolBuilder("SchedulingThreadPool")
- .set_min_threads(QUEUE_NUM)
- .set_max_threads(QUEUE_NUM)
- .build(&_scheduler_pool));
-
- _pending_queues = new
BlockingQueue<std::shared_ptr<ScannerContext>>*[QUEUE_NUM];
- for (int i = 0; i < QUEUE_NUM; i++) {
- _pending_queues[i] = new
BlockingQueue<std::shared_ptr<ScannerContext>>(INT32_MAX);
- static_cast<void>(_scheduler_pool->submit_func([this, i] {
this->_schedule_thread(i); }));
- }
-
- // 2. local scan thread pool
+ // 1. local scan thread pool
_local_scan_thread_pool = std::make_unique<PriorityThreadPool>(
config::doris_scanner_thread_pool_thread_num,
config::doris_scanner_thread_pool_queue_size, "local_scan");
- // 3. remote scan thread pool
+ // 2. remote scan thread pool
_remote_thread_pool_max_size =
config::doris_max_remote_scanner_thread_pool_thread_num != -1
?
config::doris_max_remote_scanner_thread_pool_thread_num
: std::max(512,
CpuInfo::num_cores() * 10);
@@ -128,7 +106,7 @@ Status ScannerScheduler::init(ExecEnv* env) {
_remote_thread_pool_max_size,
config::doris_remote_scanner_thread_pool_queue_size,
"RemoteScanThreadPool");
- // 4. limited scan thread pool
+ // 3. limited scan thread pool
static_cast<void>(ThreadPoolBuilder("LimitedScanThreadPool")
.set_min_threads(config::doris_scanner_thread_pool_thread_num)
.set_max_threads(config::doris_scanner_thread_pool_thread_num)
@@ -139,155 +117,97 @@ Status ScannerScheduler::init(ExecEnv* env) {
return Status::OK();
}
-Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx) {
+void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
+ std::shared_ptr<ScanTask> scan_task) {
+ scan_task->last_submit_time = GetCurrentTimeNanos();
if (ctx->done()) {
- return Status::EndOfFile("ScannerContext is done");
- }
- ctx->queue_idx = (_queue_idx++ % QUEUE_NUM);
- if (!_pending_queues[ctx->queue_idx]->blocking_put(ctx)) {
- return Status::InternalError("failed to submit scanner context to
scheduler");
- }
- return Status::OK();
-}
-
-std::unique_ptr<ThreadPoolToken> ScannerScheduler::new_limited_scan_pool_token(
- ThreadPool::ExecutionMode mode, int max_concurrency) {
- return _limited_scan_thread_pool->new_token(mode, max_concurrency);
-}
-
-void ScannerScheduler::_schedule_thread(int queue_id) {
- BlockingQueue<std::shared_ptr<ScannerContext>>* queue =
_pending_queues[queue_id];
- while (!_is_closed) {
- std::shared_ptr<ScannerContext> ctx;
- bool ok = queue->blocking_get(&ctx);
- if (!ok) {
- // maybe closed
- continue;
- }
-
- _schedule_scanners(ctx);
- // If ctx is done, no need to schedule it again.
- // But should notice that there may still scanners running in scanner
pool.
+ return;
}
-}
-
-void ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx)
{
auto task_lock = ctx->task_exec_ctx();
if (task_lock == nullptr) {
LOG(INFO) << "could not lock task execution context, query " <<
ctx->debug_string()
<< " maybe finished";
return;
}
- MonotonicStopWatch watch;
- watch.reset();
- watch.start();
- ctx->incr_num_ctx_scheduling(1);
-
- if (ctx->done()) {
- return;
- }
-
- std::list<std::weak_ptr<ScannerDelegate>> this_run;
- ctx->get_next_batch_of_scanners(&this_run);
- if (this_run.empty()) {
- // There will be 2 cases when this_run is empty:
- // 1. The blocks queue reaches limit.
- // The consumer will continue scheduling the ctx.
- // 2. All scanners are running.
- // There running scanner will schedule the ctx after they are
finished.
- // So here we just return to stop scheduling ctx.
- return;
- }
-
- ctx->inc_num_running_scanners(this_run.size());
// Submit scanners to thread pool
// TODO(cmy): How to handle this "nice"?
int nice = 1;
- auto iter = this_run.begin();
if (ctx->thread_token != nullptr) {
- // TODO llj tg how to treat this?
- while (iter != this_run.end()) {
- std::shared_ptr<ScannerDelegate> scanner_delegate = (*iter).lock();
- if (scanner_delegate == nullptr) {
- // Has to ++, or there is a dead loop
- iter++;
- continue;
- }
- scanner_delegate->_scanner->start_wait_worker_timer();
- auto s = ctx->thread_token->submit_func([this, scanner_ref =
*iter, ctx]() {
- this->_scanner_scan(this, ctx, scanner_ref);
- });
- if (s.ok()) {
- iter++;
- } else {
- ctx->set_status_on_error(s);
- break;
- }
+ std::shared_ptr<ScannerDelegate> scanner_delegate =
scan_task->scanner.lock();
+ if (scanner_delegate == nullptr) {
+ return;
+ }
+
+ scanner_delegate->_scanner->start_wait_worker_timer();
+ auto s = ctx->thread_token->submit_func(
+ [this, scanner_ref = scan_task, ctx]() {
this->_scanner_scan(ctx, scanner_ref); });
+ if (!s.ok()) {
+ scan_task->set_status(s);
+ ctx->append_block_to_queue(scan_task);
+ return;
}
} else {
- while (iter != this_run.end()) {
- std::shared_ptr<ScannerDelegate> scanner_delegate = (*iter).lock();
- if (scanner_delegate == nullptr) {
- // Has to ++, or there is a dead loop
- iter++;
- continue;
- }
- scanner_delegate->_scanner->start_wait_worker_timer();
- TabletStorageType type =
scanner_delegate->_scanner->get_storage_type();
- bool ret = false;
- if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
- if (auto* scan_sche = ctx->get_simple_scan_scheduler()) {
- auto work_func = [this, scanner_ref = *iter, ctx]() {
- this->_scanner_scan(this, ctx, scanner_ref);
- };
- SimplifiedScanTask simple_scan_task = {work_func, ctx};
- ret =
scan_sche->get_scan_queue()->try_put(simple_scan_task);
- } else {
- PriorityThreadPool::Task task;
- task.work_function = [this, scanner_ref = *iter, ctx]() {
- this->_scanner_scan(this, ctx, scanner_ref);
- };
- task.priority = nice;
- ret = _local_scan_thread_pool->offer(task);
- }
+ std::shared_ptr<ScannerDelegate> scanner_delegate =
scan_task->scanner.lock();
+ if (scanner_delegate == nullptr) {
+ return;
+ }
+
+ scanner_delegate->_scanner->start_wait_worker_timer();
+ TabletStorageType type =
scanner_delegate->_scanner->get_storage_type();
+ bool ret = false;
+ if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
+ if (auto* scan_sche = ctx->get_simple_scan_scheduler()) {
+ auto work_func = [this, scanner_ref = scan_task, ctx]() {
+ this->_scanner_scan(ctx, scanner_ref);
+ };
+ SimplifiedScanTask simple_scan_task = {work_func, ctx};
+ ret = scan_sche->get_scan_queue()->try_put(simple_scan_task);
} else {
PriorityThreadPool::Task task;
- task.work_function = [this, scanner_ref = *iter, ctx]() {
- this->_scanner_scan(this, ctx, scanner_ref);
+ task.work_function = [this, scanner_ref = scan_task, ctx]() {
+ this->_scanner_scan(ctx, scanner_ref);
};
task.priority = nice;
- ret = _remote_scan_thread_pool->offer(task);
- }
- if (ret) {
- iter++;
- } else {
- ctx->set_status_on_error(
- Status::InternalError("failed to submit scanner to
scanner pool"));
- break;
+ ret = _local_scan_thread_pool->offer(task);
}
+ } else {
+ PriorityThreadPool::Task task;
+ task.work_function = [this, scanner_ref = scan_task, ctx]() {
+ this->_scanner_scan(ctx, scanner_ref);
+ };
+ task.priority = nice;
+ ret = _remote_scan_thread_pool->offer(task);
+ }
+ if (!ret) {
+ scan_task->set_status(
+ Status::InternalError("Failed to submit scanner to scanner
pool"));
+ ctx->append_block_to_queue(scan_task);
+ return;
}
}
- ctx->incr_ctx_scheduling_time(watch.elapsed_time());
}
-void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler,
- std::shared_ptr<ScannerContext> ctx,
- std::weak_ptr<ScannerDelegate>
scanner_ref) {
+std::unique_ptr<ThreadPoolToken> ScannerScheduler::new_limited_scan_pool_token(
+ ThreadPool::ExecutionMode mode, int max_concurrency) {
+ return _limited_scan_thread_pool->new_token(mode, max_concurrency);
+}
+
+void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
+ std::shared_ptr<ScanTask> scan_task) {
+ // record the time from scanner submission to actual execution in
nanoseconds
+ ctx->incr_ctx_scheduling_time(GetCurrentTimeNanos() -
scan_task->last_submit_time);
auto task_lock = ctx->task_exec_ctx();
if (task_lock == nullptr) {
- // LOG(WARNING) << "could not lock task execution context, query " <<
print_id(_query_id)
- // << " maybe finished";
return;
}
- //LOG_EVERY_N(INFO, 100) << "start running scanner from ctx " <<
ctx->debug_string();
- // will release scanner if it is the last one, task lock is hold here, to
ensure
- // that scanner could call scannode's method during deconstructor
- std::shared_ptr<ScannerDelegate> scanner_delegate = scanner_ref.lock();
- auto& scanner = scanner_delegate->_scanner;
+
+ std::shared_ptr<ScannerDelegate> scanner_delegate =
scan_task->scanner.lock();
if (scanner_delegate == nullptr) {
return;
}
+
+ VScannerSPtr& scanner = scanner_delegate->_scanner;
SCOPED_ATTACH_TASK(scanner->runtime_state());
// for cpu hard limit, thread name should not be reset
if (ctx->_should_reset_thread_name) {
@@ -310,14 +230,13 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler*
scheduler,
if (!scanner->is_init()) {
status = scanner->init();
if (!status.ok()) {
- ctx->set_status_on_error(status);
eos = true;
}
}
+
if (!eos && !scanner->is_open()) {
status = scanner->open(state);
if (!status.ok()) {
- ctx->set_status_on_error(status);
eos = true;
}
scanner->set_opened();
@@ -325,46 +244,21 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler*
scheduler,
static_cast<void>(scanner->try_append_late_arrival_runtime_filter());
- // Because we use thread pool to scan data from storage. One scanner can't
- // use this thread too long, this can starve other query's scanner. So, we
- // need yield this thread when we do enough work. However, OlapStorage read
- // data in pre-aggregate mode, then we can't use storage returned data to
- // judge if we need to yield. So we record all raw data read in this round
- // scan, if this exceeds row number or bytes threshold, we yield this
thread.
- std::vector<vectorized::BlockUPtr> blocks;
- int64_t raw_bytes_read = 0;
- int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
- int num_rows_in_block = 0;
-
- // Only set to true when ctx->done() return true.
- // Use this flag because we need distinguish eos from `should_stop`.
- // If eos is true, we still need to return blocks,
- // but is should_stop is true, no need to return blocks
- bool should_stop = false;
- // Has to wait at least one full block, or it will cause a lot of schedule
task in priority
- // queue, it will affect query latency and query concurrency for example
ssb 3.3.
- auto should_do_scan = [&, batch_size = state->batch_size(),
- time = state->wait_full_block_schedule_times()]() {
- if (raw_bytes_read < raw_bytes_threshold) {
- return true;
- } else if (num_rows_in_block < batch_size) {
- return raw_bytes_read < raw_bytes_threshold * time;
- }
- return false;
- };
-
- while (!eos && should_do_scan()) {
- // TODO llj task group should should_yield?
+ bool first_read = true;
+ while (!eos) {
if (UNLIKELY(ctx->done())) {
- // No need to set status on error here.
- // Because done() maybe caused by "should_stop"
- should_stop = true;
+ eos = true;
break;
}
+ BlockUPtr free_block = nullptr;
+ if (first_read) {
+ status = scanner->get_block_after_projects(state,
scan_task->current_block.get(), &eos);
+ first_read = false;
+ } else {
+ free_block = ctx->get_free_block();
+ status = scanner->get_block_after_projects(state,
free_block.get(), &eos);
+ }
- BlockUPtr block = ctx->get_free_block();
-
- status = scanner->get_block_after_projects(state, block.get(), &eos);
// The VFileScanner for external table may try to open not exist files,
// Because FE file cache for external table may out of date.
// So, NOT_FOUND for VFileScanner is not a fail case.
@@ -374,49 +268,36 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler*
scheduler,
!status.is<ErrorCode::NOT_FOUND>()))) {
LOG(WARNING) << "Scan thread read VScanner failed: " <<
status.to_string();
break;
- }
- VLOG_ROW << "VScanNode input rows: " << block->rows() << ", eos: " <<
eos;
- if (status.is<ErrorCode::NOT_FOUND>()) {
+ } else if (status.is<ErrorCode::NOT_FOUND>()) {
// The only case in this "if" branch is external table file delete
and fe cache has not been updated yet.
// Set status to OK.
status = Status::OK();
eos = true;
+ break;
}
- raw_bytes_read += block->allocated_bytes();
- num_rows_in_block += block->rows();
- if (UNLIKELY(block->rows() == 0)) {
- ctx->return_free_block(std::move(block));
- } else {
- if (!blocks.empty() && blocks.back()->rows() + block->rows() <=
state->batch_size()) {
- vectorized::MutableBlock mutable_block(blocks.back().get());
- static_cast<void>(mutable_block.merge(*block));
-
blocks.back().get()->set_columns(std::move(mutable_block.mutable_columns()));
- ctx->return_free_block(std::move(block));
- } else {
- blocks.push_back(std::move(block));
- }
+ if (!first_read && free_block) {
+ vectorized::MutableBlock
mutable_block(scan_task->current_block.get());
+ static_cast<void>(mutable_block.merge(*free_block));
+
scan_task->current_block->set_columns(std::move(mutable_block.mutable_columns()));
+ ctx->return_free_block(std::move(free_block));
+ }
+ if (scan_task->current_block->rows() >= ctx->batch_size()) {
+ break;
}
} // end for while
- // if we failed, check status.
if (UNLIKELY(!status.ok())) {
- // _transfer_done = true;
- ctx->set_status_on_error(status);
+ scan_task->set_status(status);
eos = true;
- blocks.clear();
- } else if (should_stop) {
- // No need to return blocks because of should_stop, just delete them
- blocks.clear();
- } else if (!blocks.empty()) {
- ctx->append_blocks_to_queue(blocks);
}
scanner->update_scan_cpu_timer();
- if (eos || should_stop) {
+ if (eos) {
scanner->mark_to_need_to_close();
}
- ctx->push_back_scanner_and_reschedule(scanner_delegate);
+ scan_task->set_eos(eos);
+ ctx->append_block_to_queue(scan_task);
}
void ScannerScheduler::_register_metrics() {
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h
b/be/src/vec/exec/scan/scanner_scheduler.h
index 9fedd27dbd8..7a602038956 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -37,25 +37,18 @@ class BlockingQueue;
namespace doris::vectorized {
class ScannerDelegate;
+class ScanTask;
class ScannerContext;
// Responsible for the scheduling and execution of all Scanners of a BE node.
-// ScannerScheduler has two types of thread pools:
-// 1. Scheduling thread pool
-// Responsible for Scanner scheduling.
-// A set of Scanners for a query will be encapsulated into a ScannerContext
-// and submitted to the ScannerScheduler's scheduling queue.
-// There are multiple scheduling queues in ScannerScheduler, and each
scheduling queue
-// is handled by a scheduling thread.
-// The scheduling thread is scheduled in granularity of ScannerContext,
-// that is, a group of Scanners in a ScannerContext are scheduled at a
time.
-//
-//2. Execution thread pool
-// The scheduling thread will submit the Scanners selected from the
ScannerContext
+// Execution thread pool
+// When a ScannerContext is launched, it will submit the running scanners
to this scheduler.
+// The scheduling thread will submit the running scanner and its
ScannerContext
// to the execution thread pool to do the actual scan task.
-// Each Scanner will act as a producer, read a group of blocks and put
them into
+// Each Scanner will act as a producer, read the next block and put it into
// the corresponding block queue.
// The corresponding ScanNode will act as a consumer to consume blocks
from the block queue.
+// After the block is consumed, the unfinished scanner will resubmit to
this scheduler.
class ScannerScheduler {
public:
ScannerScheduler();
@@ -63,7 +56,7 @@ public:
[[nodiscard]] Status init(ExecEnv* env);
- [[nodiscard]] Status submit(std::shared_ptr<ScannerContext> ctx);
+ void submit(std::shared_ptr<ScannerContext> ctx, std::shared_ptr<ScanTask>
scan_task);
void stop();
@@ -73,32 +66,13 @@ public:
int remote_thread_pool_max_size() const { return
_remote_thread_pool_max_size; }
private:
- // scheduling thread function
- void _schedule_thread(int queue_id);
- // schedule scanners in a certain ScannerContext
- void _schedule_scanners(std::shared_ptr<ScannerContext> ctx);
- // execution thread function
- void _scanner_scan(ScannerScheduler* scheduler,
std::shared_ptr<ScannerContext> ctx,
- std::weak_ptr<ScannerDelegate> scanner);
+ static void _scanner_scan(std::shared_ptr<ScannerContext> ctx,
+ std::shared_ptr<ScanTask> scan_task);
void _register_metrics();
static void _deregister_metrics();
- // Scheduling queue number.
- // TODO: make it configurable.
- static const int QUEUE_NUM = 4;
- // The ScannerContext will be submitted to the pending queue roundrobin.
- // _queue_idx pointer to the current queue.
- // Use std::atomic_uint to prevent numerical overflow from memory out of
bound.
- // The scheduler thread will take ctx from pending queue, schedule it,
- // and put it to the _scheduling_map.
- // If any scanner finish, it will take ctx from and put it to pending
queue again.
- std::atomic_uint _queue_idx = {0};
- BlockingQueue<std::shared_ptr<ScannerContext>>** _pending_queues = nullptr;
-
- // scheduling thread pool
- std::unique_ptr<ThreadPool> _scheduler_pool;
// execution thread pool
// _local_scan_thread_pool is for local scan task(typically, olap scanner)
// _remote_scan_thread_pool is for remote scan task(cold data on s3, hdfs,
etc.)
diff --git a/be/src/vec/exec/scan/vscan_node.cpp
b/be/src/vec/exec/scan/vscan_node.cpp
index 568b206db5e..f2f4e242bc6 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -197,7 +197,6 @@ Status VScanNode::alloc_resource(RuntimeState* state) {
if (_scanner_ctx) {
DCHECK(!_eos && _num_scanners->value() > 0);
RETURN_IF_ERROR(_scanner_ctx->init());
-
RETURN_IF_ERROR(_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx));
}
if (_shared_scan_opt) {
LOG(INFO) << "instance shared scan enabled"
@@ -219,7 +218,6 @@ Status VScanNode::alloc_resource(RuntimeState* state) {
: Status::OK());
if (_scanner_ctx) {
RETURN_IF_ERROR(_scanner_ctx->init());
-
RETURN_IF_ERROR(_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx));
}
}
@@ -246,14 +244,10 @@ Status VScanNode::get_next(RuntimeState* state,
vectorized::Block* block, bool*
}};
if (state->is_cancelled()) {
- // ISSUE: https://github.com/apache/doris/issues/16360
- // _scanner_ctx may be null here, see: `VScanNode::alloc_resource`
(_eos == null)
if (_scanner_ctx) {
- _scanner_ctx->set_status_on_error(Status::Cancelled("query
cancelled"));
- return _scanner_ctx->status();
- } else {
- return Status::Cancelled("query cancelled");
+ _scanner_ctx->stop_scanners(state);
}
+ return Status::Cancelled("Query cancelled in ScanNode");
}
if (_eos) {
@@ -261,16 +255,7 @@ Status VScanNode::get_next(RuntimeState* state,
vectorized::Block* block, bool*
return Status::OK();
}
- vectorized::BlockUPtr scan_block = nullptr;
- RETURN_IF_ERROR(_scanner_ctx->get_block_from_queue(state, &scan_block,
eos, _context_queue_id));
- if (*eos) {
- DCHECK(scan_block == nullptr);
- return Status::OK();
- }
-
- // get scanner's block memory
- block->swap(*scan_block);
- _scanner_ctx->return_free_block(std::move(scan_block));
+ RETURN_IF_ERROR(_scanner_ctx->get_block_from_queue(state, block, eos,
_context_queue_id));
reached_limit(block, eos);
if (*eos) {
@@ -294,16 +279,14 @@ Status VScanNode::_init_profile() {
runtime_profile()->add_child(_scanner_profile.get(), true, nullptr);
_memory_usage_counter = ADD_LABEL_COUNTER(_scanner_profile, "MemoryUsage");
- _queued_blocks_memory_usage =
- _scanner_profile->AddHighWaterMarkCounter("QueuedBlocks",
TUnit::BYTES, "MemoryUsage");
_free_blocks_memory_usage =
_scanner_profile->AddHighWaterMarkCounter("FreeBlocks",
TUnit::BYTES, "MemoryUsage");
_newly_create_free_blocks_num =
ADD_COUNTER(_scanner_profile, "NewlyCreateFreeBlocksNum",
TUnit::UNIT);
+ _scale_up_scanners_counter = ADD_COUNTER(_scanner_profile,
"NumScaleUpScanners", TUnit::UNIT);
// time of transfer thread to wait for block from scan thread
_scanner_wait_batch_timer = ADD_TIMER(_scanner_profile,
"ScannerBatchWaitTime");
_scanner_sched_counter = ADD_COUNTER(_scanner_profile,
"ScannerSchedCount", TUnit::UNIT);
- _scanner_ctx_sched_counter = ADD_COUNTER(_scanner_profile,
"ScannerCtxSchedCount", TUnit::UNIT);
_scanner_ctx_sched_time = ADD_TIMER(_scanner_profile,
"ScannerCtxSchedTime");
_scan_timer = ADD_TIMER(_scanner_profile, "ScannerGetBlockTime");
diff --git a/be/src/vec/exec/scan/vscan_node.h
b/be/src/vec/exec/scan/vscan_node.h
index 0c39d15b57f..b83e9211214 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -377,7 +377,6 @@ protected:
RuntimeProfile::Counter* _filter_timer = nullptr;
RuntimeProfile::Counter* _scanner_sched_counter = nullptr;
- RuntimeProfile::Counter* _scanner_ctx_sched_counter = nullptr;
RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr;
RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr;
RuntimeProfile::Counter* _scanner_wait_worker_timer = nullptr;
@@ -387,8 +386,8 @@ protected:
RuntimeProfile::Counter* _max_scanner_thread_num = nullptr;
RuntimeProfile::Counter* _memory_usage_counter = nullptr;
- RuntimeProfile::HighWaterMarkCounter* _queued_blocks_memory_usage =
nullptr;
RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage = nullptr;
+ RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr;
std::unordered_map<std::string, int> _colname_to_slot_id;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 8b5e24a52aa..8effd6adae2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -74,6 +74,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String EXEC_MEM_LIMIT = "exec_mem_limit";
public static final String SCAN_QUEUE_MEM_LIMIT = "scan_queue_mem_limit";
+ public static final String NUM_SCANNER_THREADS = "num_scanner_threads";
+ public static final String SCANNER_SCALE_UP_RATIO =
"scanner_scale_up_ratio";
public static final String QUERY_TIMEOUT = "query_timeout";
public static final String ANALYZE_TIMEOUT = "analyze_timeout";
@@ -553,6 +555,20 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT)
public long maxScanQueueMemByte = 2147483648L / 20;
+ @VariableMgr.VarAttr(name = NUM_SCANNER_THREADS, needForward = true,
description = {
+ "ScanNode扫描数据的最大并发,默认为0,采用BE的doris_scanner_thread_pool_thread_num",
+ "The max threads to read data of ScanNode, "
+ + "default 0, use doris_scanner_thread_pool_thread_num in
be.conf"
+ })
+ public int numScannerThreads = 0;
+
+ @VariableMgr.VarAttr(name = SCANNER_SCALE_UP_RATIO, needForward = true,
description = {
+ "ScanNode自适应的增加扫描并发,最大允许增长的并发倍率,默认为0,关闭该功能",
+ "The max multiple of increasing the concurrency of scanners
adaptively, "
+ + "default 0, turn off scaling up"
+ })
+ public double scannerScaleUpRatio = 0;
+
@VariableMgr.VarAttr(name = ENABLE_SPILLING)
public boolean enableSpilling = false;
@@ -1790,6 +1806,14 @@ public class SessionVariable implements Serializable,
Writable {
return maxScanQueueMemByte;
}
+ public int getNumScannerThreads() {
+ return numScannerThreads;
+ }
+
+ public double getScannerScaleUpRatio() {
+ return scannerScaleUpRatio;
+ }
+
public int getQueryTimeoutS() {
return queryTimeoutS;
}
@@ -1962,7 +1986,15 @@ public class SessionVariable implements Serializable,
Writable {
}
public void setMaxScanQueueMemByte(long scanQueueMemByte) {
- this.maxScanQueueMemByte = Math.min(scanQueueMemByte, maxExecMemByte /
2);
+ this.maxScanQueueMemByte = scanQueueMemByte;
+ }
+
+ public void setNumScannerThreads(int numScannerThreads) {
+ this.numScannerThreads = numScannerThreads;
+ }
+
+ public void setScannerScaleUpRatio(double scannerScaleUpRatio) {
+ this.scannerScaleUpRatio = scannerScaleUpRatio;
}
public boolean isSqlQuoteShowCreate() {
@@ -2771,7 +2803,9 @@ public class SessionVariable implements Serializable,
Writable {
public TQueryOptions toThrift() {
TQueryOptions tResult = new TQueryOptions();
tResult.setMemLimit(maxExecMemByte);
- tResult.setScanQueueMemLimit(Math.min(maxScanQueueMemByte,
maxExecMemByte / 20));
+ tResult.setScanQueueMemLimit(maxScanQueueMemByte);
+ tResult.setNumScannerThreads(numScannerThreads);
+ tResult.setScannerScaleUpRatio(scannerScaleUpRatio);
// TODO chenhao, reservation will be calculated by cost
tResult.setMinReservation(0);
@@ -3067,7 +3101,9 @@ public class SessionVariable implements Serializable,
Writable {
public TQueryOptions getQueryOptionVariables() {
TQueryOptions queryOptions = new TQueryOptions();
queryOptions.setMemLimit(maxExecMemByte);
- queryOptions.setScanQueueMemLimit(Math.min(maxScanQueueMemByte,
maxExecMemByte / 20));
+ queryOptions.setScanQueueMemLimit(maxScanQueueMemByte);
+ queryOptions.setNumScannerThreads(numScannerThreads);
+ queryOptions.setScannerScaleUpRatio(scannerScaleUpRatio);
queryOptions.setQueryTimeout(queryTimeoutS);
queryOptions.setInsertTimeout(insertTimeoutS);
queryOptions.setAnalyzeTimeout(analyzeTimeoutS);
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index b0e3a50628b..c321aa2660e 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -271,6 +271,8 @@ struct TQueryOptions {
97: optional i64 parallel_scan_min_rows_per_scanner = 0;
98: optional bool skip_bad_tablet = false;
+ // Increase concurrency of scanners adaptively, the maxinum times to scale up
+ 99: optional double scanner_scale_up_ratio = 0;
// For cloud, to control if the content would be written into file cache
1000: optional bool disable_file_cache = false
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]