This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e40f7f87597 [Opt](scanner-scheduler) Opt scanner scheduler starvation 
issue. (#40641)
e40f7f87597 is described below

commit e40f7f875978ab6750aec93a4187d9e08bbc55d6
Author: Qi Chen <[email protected]>
AuthorDate: Thu Sep 26 22:48:54 2024 +0800

    [Opt](scanner-scheduler) Opt scanner scheduler starvation issue. (#40641)
    
    ## Proposed changes
    
    ### Issue
    When a scanner scheduler is stuck in executing a scan task, other scan
    tasks will starve and have no chance to execute, which will affect other
    queries. Currently, the scan task hopes to scan as much data as possible
    to reduce the overhead of scheduling switching. Currently, it hopes to
    obtain up to 10MB of data in `doris_scanner_row_bytes`. However, if a
    query scans a table with many rows of data, but the filtering rate is
    very high, the filter will eventually filter out a lot of data and will
    never get 10MB of data. It will keep getting and executing expression
    filtering, which will cause other scan tasks to starve.
    
    ### Solution
    The current solution is to check `max_run_time_ms` by
    `MonotonicStopWatch`. After executing for a maximum of 1s, it will yield
    self's task for other tasks. When the scan task executes some
    time-consuming tasks, it needs to slice to do it.
---
 be/src/common/config.cpp                                 | 2 ++
 be/src/common/config.h                                   | 2 ++
 be/src/vec/exec/format/parquet/vparquet_group_reader.cpp | 5 +++++
 be/src/vec/exec/scan/scanner_scheduler.cpp               | 6 ++++++
 be/src/vec/exec/scan/vfile_scanner.cpp                   | 2 +-
 5 files changed, 16 insertions(+), 1 deletion(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index a742b1dc8db..7fed3b30824 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -284,6 +284,8 @@ DEFINE_mInt32(doris_scan_range_max_mb, "1024");
 DEFINE_mInt32(doris_scanner_row_num, "16384");
 // single read execute fragment row bytes
 DEFINE_mInt32(doris_scanner_row_bytes, "10485760");
+// single read execute fragment max run time millseconds
+DEFINE_mInt32(doris_scanner_max_run_time_ms, "1000");
 // (Advanced) Maximum size of per-query receive-side buffer
 DEFINE_mInt32(exchg_node_buffer_size_bytes, "20485760");
 DEFINE_mInt32(exchg_buffer_queue_capacity_factor, "64");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 83cebf8d7bc..09b2311721e 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -332,6 +332,8 @@ DECLARE_mInt32(doris_scan_range_max_mb);
 DECLARE_mInt32(doris_scanner_row_num);
 // single read execute fragment row bytes
 DECLARE_mInt32(doris_scanner_row_bytes);
+// single read execute fragment max run time millseconds
+DECLARE_mInt32(doris_scanner_max_run_time_ms);
 // (Advanced) Maximum size of per-query receive-side buffer
 DECLARE_mInt32(exchg_node_buffer_size_bytes);
 DECLARE_mInt32(exchg_buffer_queue_capacity_factor);
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index b993f4cd31e..08ecb601f39 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -455,6 +455,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
         columns_to_filter[i] = i;
     }
     IColumn::Filter result_filter;
+    size_t pre_raw_read_rows = 0;
     while (!_state->is_cancelled()) {
         // read predicate columns
         pre_read_rows = 0;
@@ -466,6 +467,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
             DCHECK_EQ(pre_eof, true);
             break;
         }
+        pre_raw_read_rows += pre_read_rows;
         RETURN_IF_ERROR(_fill_partition_columns(block, pre_read_rows,
                                                 
_lazy_read_ctx.predicate_partition_columns));
         RETURN_IF_ERROR(_fill_missing_columns(block, pre_read_rows,
@@ -518,6 +520,9 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
             Block::erase_useless_column(block, origin_column_num);
 
             if (!pre_eof) {
+                if (pre_raw_read_rows >= config::doris_scanner_row_num) {
+                    break;
+                }
                 // If continuous batches are skipped, we can cache them to 
skip a whole page
                 _cached_filtered_rows += pre_read_rows;
             } else { // pre_eof
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 34a12fbd978..23ed5db0798 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -247,6 +247,8 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
         Thread::set_thread_nice_value();
     }
 #endif
+    MonotonicStopWatch max_run_time_watch;
+    max_run_time_watch.start();
     scanner->update_wait_worker_timer();
     scanner->start_scan_cpu_timer();
     Status status = Status::OK();
@@ -281,6 +283,10 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
                     eos = true;
                     break;
                 }
+                if (max_run_time_watch.elapsed_time() >
+                    config::doris_scanner_max_run_time_ms * 1e6) {
+                    break;
+                }
                 BlockUPtr free_block = ctx->get_free_block(first_read);
                 if (free_block == nullptr) {
                     break;
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 52aa752935e..ffc88d07cab 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -333,8 +333,8 @@ Status VFileScanner::_get_block_wrapped(RuntimeState* 
state, Block* block, bool*
                 // or not found in the file column schema.
                 RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block));
             }
-            break;
         }
+        break;
     } while (true);
 
     // Update filtered rows and unselected rows for load, reset counter.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to