This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 54fe1a166b9 [Refactor](scan) refactor scan scheduler to improve 
performance (#27948)
54fe1a166b9 is described below

commit 54fe1a166b984fa349c76d5becad94859a2d7fc7
Author: HappenLee <[email protected]>
AuthorDate: Tue Dec 5 13:03:16 2023 +0800

    [Refactor](scan) refactor scan scheduler to improve performance (#27948)
    
    * [Refactor](scan) refactor scan scheduler to improve performance
    
    * fix pipeline x core
---
 be/src/runtime/runtime_state.h                     |   3 +
 be/src/vec/exec/scan/pip_scanner_context.h         |  22 ++++
 be/src/vec/exec/scan/scanner_context.cpp           | 125 ++++++++++++---------
 be/src/vec/exec/scan/scanner_scheduler.cpp         |  19 +++-
 .../main/java/org/apache/doris/qe/Coordinator.java |   1 +
 .../java/org/apache/doris/qe/SessionVariable.java  |   9 ++
 gensrc/thrift/PaloInternalService.thrift           |   2 +
 7 files changed, 125 insertions(+), 56 deletions(-)

diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index b8ab49ff276..e37883abbe1 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -107,6 +107,9 @@ public:
     const DescriptorTbl& desc_tbl() const { return *_desc_tbl; }
     void set_desc_tbl(const DescriptorTbl* desc_tbl) { _desc_tbl = desc_tbl; }
     int batch_size() const { return _query_options.batch_size; }
+    int wait_full_block_schedule_times() const {
+        return _query_options.wait_full_block_schedule_times;
+    }
     bool abort_on_error() const { return _query_options.abort_on_error; }
     bool abort_on_default_limit_exceeded() const {
         return _query_options.abort_on_default_limit_exceeded;
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h 
b/be/src/vec/exec/scan/pip_scanner_context.h
index 681fc09739a..57d632a03ea 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -63,6 +63,7 @@ public:
             }
         }
 
+        std::vector<vectorized::BlockUPtr> merge_blocks;
         {
             std::unique_lock<std::mutex> l(*_queue_mutexs[id]);
             if (_blocks_queues[id].empty()) {
@@ -76,6 +77,18 @@ public:
             *block = std::move(_blocks_queues[id].front());
             _blocks_queues[id].pop_front();
 
+            auto rows = (*block)->rows();
+            while (!_blocks_queues[id].empty()) {
+                const auto add_rows = (*_blocks_queues[id].front()).rows();
+                if (rows + add_rows < state->batch_size()) {
+                    rows += add_rows;
+                    
merge_blocks.emplace_back(std::move(_blocks_queues[id].front()));
+                    _blocks_queues[id].pop_front();
+                } else {
+                    break;
+                }
+            }
+
             if (_blocks_queues[id].empty()) {
                 this->reschedule_scanner_ctx();
                 if (_dependency) {
@@ -83,7 +96,16 @@ public:
                 }
             }
         }
+
         _current_used_bytes -= (*block)->allocated_bytes();
+        if (!merge_blocks.empty()) {
+            vectorized::MutableBlock m(block->get());
+            for (auto& merge_block : merge_blocks) {
+                _current_used_bytes -= merge_block->allocated_bytes();
+                static_cast<void>(m.merge(*merge_block));
+                return_free_block(std::move(merge_block));
+            }
+        }
 
         return Status::OK();
     }
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 0d9b0351dd8..7cad8242c1f 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -209,65 +209,90 @@ bool ScannerContext::empty_in_queue(int id) {
 
 Status ScannerContext::get_block_from_queue(RuntimeState* state, 
vectorized::BlockUPtr* block,
                                             bool* eos, int id, bool wait) {
-    std::unique_lock l(_transfer_lock);
-    // Normally, the scanner scheduler will schedule ctx.
-    // But when the amount of data in the blocks queue exceeds the upper limit,
-    // the scheduler will stop scheduling.
-    // (if the scheduler continues to schedule, it will cause a lot of busy 
running).
-    // At this point, consumers are required to trigger new scheduling to 
ensure that
-    // data can be continuously fetched.
-    int64_t cur_bytes_in_queue = _cur_bytes_in_queue;
-    int32_t serving_blocks_num = _serving_blocks_num;
-    bool to_be_schedule = should_be_scheduled();
-    int num_running_scanners = _num_running_scanners;
-
-    bool is_scheduled = false;
-    if (to_be_schedule && _num_running_scanners == 0) {
-        is_scheduled = true;
-        auto state = _scanner_scheduler->submit(this);
-        if (state.ok()) {
-            _num_scheduling_ctx++;
-        } else {
-            set_status_on_error(state, false);
+    std::vector<vectorized::BlockUPtr> merge_blocks;
+    {
+        std::unique_lock l(_transfer_lock);
+        // Normally, the scanner scheduler will schedule ctx.
+        // But when the amount of data in the blocks queue exceeds the upper 
limit,
+        // the scheduler will stop scheduling.
+        // (if the scheduler continues to schedule, it will cause a lot of 
busy running).
+        // At this point, consumers are required to trigger new scheduling to 
ensure that
+        // data can be continuously fetched.
+        int64_t cur_bytes_in_queue = _cur_bytes_in_queue;
+        int32_t serving_blocks_num = _serving_blocks_num;
+        bool to_be_schedule = should_be_scheduled();
+        int num_running_scanners = _num_running_scanners;
+
+        bool is_scheduled = false;
+        if (to_be_schedule && _num_running_scanners == 0) {
+            is_scheduled = true;
+            auto state = _scanner_scheduler->submit(this);
+            if (state.ok()) {
+                _num_scheduling_ctx++;
+            } else {
+                set_status_on_error(state, false);
+            }
         }
-    }
 
-    // Wait for block from queue
-    if (wait) {
-        // scanner batch wait time
-        SCOPED_TIMER(_scanner_wait_batch_timer);
-        while (!(!_blocks_queue.empty() || _is_finished || !status().ok() ||
-                 state->is_cancelled())) {
-            if (!is_scheduled && _num_running_scanners == 0 && 
should_be_scheduled()) {
-                LOG(INFO) << "fatal, cur_bytes_in_queue " << cur_bytes_in_queue
-                          << ", serving_blocks_num " << serving_blocks_num
-                          << ", num_running_scanners " << num_running_scanners
-                          << ", to_be_scheudle " << to_be_schedule << 
(void*)this;
+        // Wait for block from queue
+        if (wait) {
+            // scanner batch wait time
+            SCOPED_TIMER(_scanner_wait_batch_timer);
+            while (!(!_blocks_queue.empty() || _is_finished || !status().ok() 
||
+                     state->is_cancelled())) {
+                if (!is_scheduled && _num_running_scanners == 0 && 
should_be_scheduled()) {
+                    LOG(INFO) << "fatal, cur_bytes_in_queue " << 
cur_bytes_in_queue
+                              << ", serving_blocks_num " << serving_blocks_num
+                              << ", num_running_scanners " << 
num_running_scanners
+                              << ", to_be_scheudle " << to_be_schedule << 
(void*)this;
+                }
+                _blocks_queue_added_cv.wait_for(l, 1s);
             }
-            _blocks_queue_added_cv.wait_for(l, 1s);
         }
-    }
 
-    if (state->is_cancelled()) {
-        set_status_on_error(Status::Cancelled("cancelled"), false);
-    }
-
-    if (!status().ok()) {
-        return status();
-    }
+        if (state->is_cancelled()) {
+            set_status_on_error(Status::Cancelled("cancelled"), false);
+        }
 
-    if (!_blocks_queue.empty()) {
-        *block = std::move(_blocks_queue.front());
-        _blocks_queue.pop_front();
+        if (!status().ok()) {
+            return status();
+        }
 
-        auto block_bytes = (*block)->allocated_bytes();
-        _cur_bytes_in_queue -= block_bytes;
+        if (!_blocks_queue.empty()) {
+            *block = std::move(_blocks_queue.front());
+            _blocks_queue.pop_front();
+            auto block_bytes = (*block)->allocated_bytes();
+            _cur_bytes_in_queue -= block_bytes;
+            _queued_blocks_memory_usage->add(-block_bytes);
+
+            auto rows = (*block)->rows();
+            while (!_blocks_queue.empty()) {
+                auto& add_block = _blocks_queue.front();
+                const auto add_rows = (*add_block).rows();
+                if (rows + add_rows < state->batch_size()) {
+                    rows += add_rows;
+                    block_bytes = (*add_block).allocated_bytes();
+                    _cur_bytes_in_queue -= block_bytes;
+                    _queued_blocks_memory_usage->add(-block_bytes);
+                    merge_blocks.emplace_back(std::move(add_block));
+                    _blocks_queue.pop_front();
+                } else {
+                    break;
+                }
+            }
+        } else {
+            *eos = _is_finished;
+        }
+    }
 
-        _queued_blocks_memory_usage->add(-block_bytes);
-        return Status::OK();
-    } else {
-        *eos = _is_finished;
+    if (!merge_blocks.empty()) {
+        vectorized::MutableBlock m(block->get());
+        for (auto& merge_block : merge_blocks) {
+            static_cast<void>(m.merge(*merge_block));
+            return_free_block(std::move(merge_block));
+        }
     }
+
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index b3cfc2e48a6..6b7bc232a2e 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -347,8 +347,18 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler, ScannerContext
     bool should_stop = false;
     // Has to wait at least one full block, or it will cause a lot of schedule 
task in priority
     // queue, it will affect query latency and query concurrency for example 
ssb 3.3.
-    while (!eos && raw_bytes_read < raw_bytes_threshold && raw_rows_read < 
raw_rows_threshold &&
-           num_rows_in_block < state->batch_size()) {
+    auto should_do_scan = [&, batch_size = state->batch_size(),
+                           time = state->wait_full_block_schedule_times()]() {
+        if (raw_bytes_read < raw_bytes_threshold && raw_rows_read < 
raw_rows_threshold) {
+            return true;
+        } else if (num_rows_in_block < batch_size) {
+            return raw_bytes_read < raw_bytes_threshold * time &&
+                   raw_rows_read < raw_rows_threshold * time;
+        }
+        return false;
+    };
+
+    while (!eos && should_do_scan()) {
         // TODO llj task group should should_yield?
         if (UNLIKELY(ctx->done())) {
             // No need to set status on error here.
@@ -384,10 +394,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler, ScannerContext
             ctx->return_free_block(std::move(block));
         } else {
             if (!blocks.empty() && blocks.back()->rows() + block->rows() <= 
state->batch_size()) {
-                status = 
vectorized::MutableBlock(blocks.back().get()).merge(*block);
-                if (!status.ok()) {
-                    break;
-                }
+                
static_cast<void>(vectorized::MutableBlock(blocks.back().get()).merge(*block));
                 ctx->return_free_block(std::move(block));
             } else {
                 blocks.push_back(std::move(block));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index f3622b62fbb..1ca37d7e70d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -374,6 +374,7 @@ public class Coordinator implements CoordInterface {
         this.queryOptions.setExecutionTimeout(context.getExecTimeout());
         
this.queryOptions.setEnableScanNodeRunSerial(context.getSessionVariable().isEnableScanRunSerial());
         
this.queryOptions.setFeProcessUuid(ExecuteEnv.getInstance().getProcessUUID());
+        
this.queryOptions.setWaitFullBlockScheduleTimes(context.getSessionVariable().getWaitFullBlockScheduleTimes());
     }
 
     public ConnectContext getConnectContext() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index c8ba8f4943b..ff957244c92 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -468,6 +468,8 @@ public class SessionVariable implements Serializable, 
Writable {
     // this session variable is set to true.
     public static final String FALLBACK_OTHER_REPLICA_WHEN_FIXED_CORRUPT = 
"fallback_other_replica_when_fixed_corrupt";
 
+    public static final String WAIT_FULL_BLOCK_SCHEDULE_TIMES = 
"wait_full_block_schedule_times";
+
     public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
             SKIP_DELETE_PREDICATE,
             SKIP_DELETE_BITMAP,
@@ -834,6 +836,9 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = USE_RF_DEFAULT)
     public boolean useRuntimeFilterDefaultSize = false;
 
+    @VariableMgr.VarAttr(name = WAIT_FULL_BLOCK_SCHEDULE_TIMES)
+    public int waitFullBlockScheduleTimes = 2;
+
     public int getBeNumberForTest() {
         return beNumberForTest;
     }
@@ -2168,6 +2173,10 @@ public class SessionVariable implements Serializable, 
Writable {
         return sqlDialect;
     }
 
+    public int getWaitFullBlockScheduleTimes() {
+        return waitFullBlockScheduleTimes;
+    }
+
     public ParseDialect.Dialect getSqlParseDialect() {
         return ParseDialect.Dialect.getByName(sqlDialect);
     }
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index d1a779e285a..401eb548a01 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -257,6 +257,8 @@ struct TQueryOptions {
   90: optional bool skip_missing_version = false;
 
   91: optional bool runtime_filter_wait_infinitely = false;
+
+  92: optional i32 wait_full_block_schedule_times = 1;
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to