This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 920aeac6940 [enhancement](scanner) add a lower bound for bytes in
scanner queue (#29624)
920aeac6940 is described below
commit 920aeac69401b10377dd634e6b8d9543ec32b062
Author: Yongqiang YANG <[email protected]>
AuthorDate: Sat Jan 27 00:06:19 2024 +0800
[enhancement](scanner) add a lower bound for bytes in scanner queue (#29624)
---
be/src/common/config.cpp | 1 +
be/src/common/config.h | 1 +
be/src/vec/exec/scan/scanner_context.cpp | 10 ++++++++++
be/src/vec/exec/scan/scanner_scheduler.cpp | 8 ++------
4 files changed, 14 insertions(+), 6 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 137d2c2fb0c..fdcc71a3616 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -260,6 +260,7 @@ DEFINE_mInt32(doris_scanner_queue_size, "1024");
DEFINE_mInt32(doris_scanner_row_num, "16384");
// single read execute fragment row bytes
DEFINE_mInt32(doris_scanner_row_bytes, "10485760");
+DEFINE_mInt32(min_bytes_in_scanner_queue, "67108864");
// number of max scan keys
DEFINE_mInt32(doris_max_scan_key_num, "48");
// the max number of push down values of a single column.
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 8a33c8c19d1..325a8d91981 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -302,6 +302,7 @@ DECLARE_mInt32(doris_scanner_queue_size);
DECLARE_mInt32(doris_scanner_row_num);
// single read execute fragment row bytes
DECLARE_mInt32(doris_scanner_row_bytes);
+DECLARE_mInt32(min_bytes_in_scanner_queue);
// number of max scan keys
DECLARE_mInt32(doris_max_scan_key_num);
// the max number of push down values of a single column.
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index 0b19f389920..7691f159509 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -45,6 +45,9 @@ namespace doris::vectorized {
using namespace std::chrono_literals;
+static bvar::Status<int64_t>
g_bytes_in_scanner_queue("doris_bytes_in_scanner_queue", 0);
+static bvar::Status<int64_t>
g_num_running_scanners("doris_num_running_scanners", 0);
+
ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor*
output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const
std::list<std::shared_ptr<ScannerDelegate>>& scanners,
@@ -179,6 +182,9 @@ Status ScannerContext::init() {
_free_blocks_capacity = _max_thread_num * _block_per_scanner;
auto block = get_free_block();
_estimated_block_bytes = std::max(block->allocated_bytes(), (size_t)16);
+ int min_blocks = (config::min_bytes_in_scanner_queue +
_estimated_block_bytes - 1) /
+ _estimated_block_bytes;
+ _free_blocks_capacity = std::max(_free_blocks_capacity, min_blocks);
return_free_block(std::move(block));
#ifndef BE_TEST
@@ -258,6 +264,7 @@ void
ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>&
}
_blocks_queue_added_cv.notify_one();
_queued_blocks_memory_usage->add(_cur_bytes_in_queue - old_bytes_in_queue);
+ g_bytes_in_scanner_queue.set_value(_cur_bytes_in_queue);
}
bool ScannerContext::empty_in_queue(int id) {
@@ -334,6 +341,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState*
state, vectorized::Blo
}
}
+ g_bytes_in_scanner_queue.set_value(_cur_bytes_in_queue);
if (!merge_blocks.empty()) {
vectorized::MutableBlock m(block->get());
for (auto& merge_block : merge_blocks) {
@@ -375,6 +383,7 @@ Status ScannerContext::validate_block_schema(Block* block) {
void ScannerContext::inc_num_running_scanners(int32_t inc) {
std::lock_guard l(_transfer_lock);
_num_running_scanners += inc;
+ g_num_running_scanners.set_value(_num_running_scanners);
}
void ScannerContext::set_status_on_error(const Status& status, bool need_lock)
{
@@ -484,6 +493,7 @@ void
ScannerContext::push_back_scanner_and_reschedule(std::shared_ptr<ScannerDel
// before we call the following if() block.
{
--_num_running_scanners;
+ g_num_running_scanners.set_value(_num_running_scanners);
if (scanner->_scanner->need_to_close()) {
--_num_unfinished_scanners;
if (_num_unfinished_scanners == 0) {
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index ff364a4af83..d8678bc0dc3 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -332,8 +332,6 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler*
scheduler,
// judge if we need to yield. So we record all raw data read in this round
// scan, if this exceeds row number or bytes threshold, we yield this
thread.
std::vector<vectorized::BlockUPtr> blocks;
- int64_t raw_rows_read = scanner->get_rows_read();
- int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num;
int64_t raw_bytes_read = 0;
int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
int num_rows_in_block = 0;
@@ -347,11 +345,10 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler*
scheduler,
// queue, it will affect query latency and query concurrency for example
ssb 3.3.
auto should_do_scan = [&, batch_size = state->batch_size(),
time = state->wait_full_block_schedule_times()]() {
- if (raw_bytes_read < raw_bytes_threshold && raw_rows_read <
raw_rows_threshold) {
+ if (raw_bytes_read < raw_bytes_threshold) {
return true;
} else if (num_rows_in_block < batch_size) {
- return raw_bytes_read < raw_bytes_threshold * time &&
- raw_rows_read < raw_rows_threshold * time;
+ return raw_bytes_read < raw_bytes_threshold * time;
}
return false;
};
@@ -400,7 +397,6 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler*
scheduler,
blocks.push_back(std::move(block));
}
}
- raw_rows_read = scanner->get_rows_read();
} // end for while
// if we failed, check status.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]