This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 646ba2cc88 [bugfix](scannode) 1. make rows_read correct 2. use single 
scanner if has limit clause (#16473)
646ba2cc88 is described below

commit 646ba2cc88bf2ca029f9edabddf5fef4adeb0abd
Author: yiguolei <[email protected]>
AuthorDate: Thu Feb 9 14:12:18 2023 +0800

    [bugfix](scannode) 1. make rows_read correct 2. use single scanner if has 
limit clause (#16473)
    
    make rows_read correct so that the scheduler could using this correctly.
    use single scanner if has limit clause. Move it from fragment context to 
scannode.
    ---------
    
    Co-authored-by: yiguolei <[email protected]>
    Co-authored-by: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
---
 be/src/runtime/fragment_mgr.cpp            | 34 +-----------------------------
 be/src/vec/exec/scan/scanner_context.cpp   |  4 ++++
 be/src/vec/exec/scan/scanner_scheduler.cpp |  4 ++--
 be/src/vec/exec/scan/vscan_node.h          | 14 +++++++++++-
 be/src/vec/exec/scan/vscanner.cpp          |  4 ++--
 be/src/vec/exec/scan/vscanner.h            |  3 ---
 6 files changed, 22 insertions(+), 41 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index ea8eb9046d..e1bd3651db 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -773,43 +773,11 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params, Fi
 void FragmentMgr::_set_scan_concurrency(const TExecPlanFragmentParams& params,
                                         QueryFragmentsCtx* fragments_ctx) {
 #ifndef BE_TEST
-    // set thread token
-    // the thread token will be set if
-    // 1. the cpu_limit is set, or
-    // 2. the limit is very small ( < 1024)
     // If the token is set, the scan task will use limited_scan_pool in 
scanner scheduler.
     // Otherwise, the scan task will use local/remote scan pool in scanner 
scheduler
-    int concurrency = 1;
-    bool is_serial = false;
-    bool need_token = false;
     if (params.query_options.__isset.resource_limit &&
         params.query_options.resource_limit.__isset.cpu_limit) {
-        concurrency = params.query_options.resource_limit.cpu_limit;
-        need_token = true;
-    } else {
-        concurrency = config::doris_scanner_thread_pool_thread_num;
-    }
-    if (params.__isset.fragment && params.fragment.__isset.plan &&
-        params.fragment.plan.nodes.size() > 0) {
-        for (auto& node : params.fragment.plan.nodes) {
-            // Only for SCAN NODE
-            if (!_is_scan_node(node.node_type)) {
-                continue;
-            }
-            if (node.__isset.conjuncts && !node.conjuncts.empty()) {
-                // If the scan node has where predicate, do not set concurrency
-                continue;
-            }
-            if (node.limit > 0 && node.limit < 1024) {
-                concurrency = 1;
-                is_serial = true;
-                need_token = true;
-                break;
-            }
-        }
-    }
-    if (need_token) {
-        fragments_ctx->set_thread_token(concurrency, is_serial);
+        
fragments_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit, 
false);
     }
 #endif
 }
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index edf7c61c60..25a23f603c 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -36,6 +36,10 @@ Status ScannerContext::init() {
     // should find a more reasonable value.
     _max_thread_num =
             std::min(config::doris_scanner_thread_pool_thread_num / 4, 
(int32_t)_scanners.size());
+    // For select * from table limit 10; should just use one thread.
+    if (_parent->should_run_serial()) {
+        _max_thread_num = 1;
+    }
 
     // 2. Calculate how many blocks need to be preallocated.
     // The calculation logic is as follows:
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 7de79247b6..085d647262 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -274,7 +274,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler, ScannerContext
     // judge if we need to yield. So we record all raw data read in this round
     // scan, if this exceeds row number or bytes threshold, we yield this 
thread.
     std::vector<vectorized::Block*> blocks;
-    int64_t raw_rows_read = scanner->raw_rows_read();
+    int64_t raw_rows_read = scanner->get_rows_read();
     int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num;
     int64_t raw_bytes_read = 0;
     int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
@@ -332,7 +332,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler, ScannerContext
                 blocks.push_back(block);
             }
         }
-        raw_rows_read = scanner->raw_rows_read();
+        raw_rows_read = scanner->get_rows_read();
     } // end for while
 
     // if we failed, check status.
diff --git a/be/src/vec/exec/scan/vscan_node.h 
b/be/src/vec/exec/scan/vscan_node.h
index 2ac36d35b7..c8d7269087 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -49,7 +49,14 @@ struct FilterPredicates {
 class VScanNode : public ExecNode {
 public:
     VScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& 
descs)
-            : ExecNode(pool, tnode, descs), 
_runtime_filter_descs(tnode.runtime_filters) {}
+            : ExecNode(pool, tnode, descs), 
_runtime_filter_descs(tnode.runtime_filters) {
+        if (!tnode.__isset.conjuncts || tnode.conjuncts.empty()) {
+            // Which means the request could be fullfilled in a single segment 
iterator request.
+            if (tnode.limit > 0 && tnode.limit < 1024) {
+                _should_run_serial = true;
+            }
+        }
+    }
     virtual ~VScanNode() = default;
 
     friend class VScanner;
@@ -95,6 +102,8 @@ public:
 
     Status try_close();
 
+    bool should_run_serial() const { return _should_run_serial; }
+
     enum class PushDownType {
         // The predicate can not be pushed down to data source
         UNACCEPTABLE,
@@ -243,6 +252,9 @@ protected:
 
     bool _need_agg_finalize = true;
     bool _blocked_by_rf = false;
+    // If the query like select * from table limit 10; then the query should 
run in
+    // single scanner to avoid too many scanners which will cause lots of 
useless read.
+    bool _should_run_serial = false;
 
     // Every time vconjunct_ctx_ptr is updated, the old ctx will be stored in 
this vector
     // so that it will be destroyed uniformly at the end of the query.
diff --git a/be/src/vec/exec/scan/vscanner.cpp 
b/be/src/vec/exec/scan/vscanner.cpp
index dfb260de50..f3c2180b43 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -37,7 +37,7 @@ Status VScanner::get_block(RuntimeState* state, Block* block, 
bool* eof) {
     // only empty block should be here
     DCHECK(block->rows() == 0);
     SCOPED_RAW_TIMER(&_per_scanner_timer);
-    int64_t raw_rows_threshold = raw_rows_read() + 
config::doris_scanner_row_num;
+    int64_t rows_read_threshold = _num_rows_read + 
config::doris_scanner_row_num;
     if (!block->mem_reuse()) {
         for (const auto slot_desc : _output_tuple_desc->slots()) {
             if (!slot_desc->need_materialize()) {
@@ -71,7 +71,7 @@ Status VScanner::get_block(RuntimeState* state, Block* block, 
bool* eof) {
                 _num_rows_return += block->rows();
             }
         } while (!state->is_cancelled() && block->rows() == 0 && !(*eof) &&
-                 raw_rows_read() < raw_rows_threshold);
+                 _num_rows_read < rows_read_threshold);
     }
 
     if (state->is_cancelled()) {
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index a91b87c0b5..8c01727ba8 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -47,9 +47,6 @@ public:
 
     virtual Status close(RuntimeState* state);
 
-    // Subclass must implement this to return the current rows read
-    virtual int64_t raw_rows_read() { return 0; }
-
 protected:
     // Subclass should implement this to return data.
     virtual Status _get_block_impl(RuntimeState* state, Block* block, bool* 
eof) = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to