This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e40f7f87597 [Opt](scanner-scheduler) Opt scanner scheduler starvation
issue. (#40641)
e40f7f87597 is described below
commit e40f7f875978ab6750aec93a4187d9e08bbc55d6
Author: Qi Chen <[email protected]>
AuthorDate: Thu Sep 26 22:48:54 2024 +0800
[Opt](scanner-scheduler) Opt scanner scheduler starvation issue. (#40641)
## Proposed changes
### Issue
When a scanner scheduler is stuck in executing a scan task, other scan
tasks will starve and have no chance to execute, which will affect other
queries. Currently, the scan task hopes to scan as much data as possible
to reduce the overhead of scheduling switching. Currently, it hopes to
obtain up to 10MB of data in `doris_scanner_row_bytes`. However, if a
query scans a table with many rows of data, but the filtering rate is
very high, the filter will eventually filter out a lot of data and will
never get 10MB of data. It will keep getting and executing expression
filtering, which will cause other scan tasks to starve.
### Solution
The current solution is to check `max_run_time_ms` by
`MonotonicStopWatch`. After executing for a maximum of 1s, it will yield
self's task for other tasks. When the scan task executes some
time-consuming tasks, it needs to slice to do it.
---
be/src/common/config.cpp | 2 ++
be/src/common/config.h | 2 ++
be/src/vec/exec/format/parquet/vparquet_group_reader.cpp | 5 +++++
be/src/vec/exec/scan/scanner_scheduler.cpp | 6 ++++++
be/src/vec/exec/scan/vfile_scanner.cpp | 2 +-
5 files changed, 16 insertions(+), 1 deletion(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index a742b1dc8db..7fed3b30824 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -284,6 +284,8 @@ DEFINE_mInt32(doris_scan_range_max_mb, "1024");
DEFINE_mInt32(doris_scanner_row_num, "16384");
// single read execute fragment row bytes
DEFINE_mInt32(doris_scanner_row_bytes, "10485760");
+// single read execute fragment max run time millseconds
+DEFINE_mInt32(doris_scanner_max_run_time_ms, "1000");
// (Advanced) Maximum size of per-query receive-side buffer
DEFINE_mInt32(exchg_node_buffer_size_bytes, "20485760");
DEFINE_mInt32(exchg_buffer_queue_capacity_factor, "64");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 83cebf8d7bc..09b2311721e 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -332,6 +332,8 @@ DECLARE_mInt32(doris_scan_range_max_mb);
DECLARE_mInt32(doris_scanner_row_num);
// single read execute fragment row bytes
DECLARE_mInt32(doris_scanner_row_bytes);
+// single read execute fragment max run time millseconds
+DECLARE_mInt32(doris_scanner_max_run_time_ms);
// (Advanced) Maximum size of per-query receive-side buffer
DECLARE_mInt32(exchg_node_buffer_size_bytes);
DECLARE_mInt32(exchg_buffer_queue_capacity_factor);
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index b993f4cd31e..08ecb601f39 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -455,6 +455,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
columns_to_filter[i] = i;
}
IColumn::Filter result_filter;
+ size_t pre_raw_read_rows = 0;
while (!_state->is_cancelled()) {
// read predicate columns
pre_read_rows = 0;
@@ -466,6 +467,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
DCHECK_EQ(pre_eof, true);
break;
}
+ pre_raw_read_rows += pre_read_rows;
RETURN_IF_ERROR(_fill_partition_columns(block, pre_read_rows,
_lazy_read_ctx.predicate_partition_columns));
RETURN_IF_ERROR(_fill_missing_columns(block, pre_read_rows,
@@ -518,6 +520,9 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
Block::erase_useless_column(block, origin_column_num);
if (!pre_eof) {
+ if (pre_raw_read_rows >= config::doris_scanner_row_num) {
+ break;
+ }
// If continuous batches are skipped, we can cache them to
skip a whole page
_cached_filtered_rows += pre_read_rows;
} else { // pre_eof
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 34a12fbd978..23ed5db0798 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -247,6 +247,8 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
Thread::set_thread_nice_value();
}
#endif
+ MonotonicStopWatch max_run_time_watch;
+ max_run_time_watch.start();
scanner->update_wait_worker_timer();
scanner->start_scan_cpu_timer();
Status status = Status::OK();
@@ -281,6 +283,10 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
eos = true;
break;
}
+ if (max_run_time_watch.elapsed_time() >
+ config::doris_scanner_max_run_time_ms * 1e6) {
+ break;
+ }
BlockUPtr free_block = ctx->get_free_block(first_read);
if (free_block == nullptr) {
break;
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 52aa752935e..ffc88d07cab 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -333,8 +333,8 @@ Status VFileScanner::_get_block_wrapped(RuntimeState*
state, Block* block, bool*
// or not found in the file column schema.
RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block));
}
- break;
}
+ break;
} while (true);
// Update filtered rows and unselected rows for load, reset counter.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]