This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new ea676de6de2 Revert "[improvement](scanner_schedule) reduce memory 
consumption of scanner #24199 (#25547)" (#26613)
ea676de6de2 is described below

commit ea676de6de22ef27005cf44a4abb258ed262ca61
Author: Kang <[email protected]>
AuthorDate: Wed Nov 8 11:04:07 2023 -0600

    Revert "[improvement](scanner_schedule) reduce memory consumption of 
scanner #24199 (#25547)" (#26613)
    
    This reverts commit 9a19581a2c55c6e78e7e2812a45e1bc3842ffec7 to investigate 
ANALYZE TABLE WITH SYNC problem
---
 be/src/exec/exec_node.cpp                  |  3 +-
 be/src/pipeline/exec/scan_operator.cpp     |  2 +-
 be/src/runtime/plan_fragment_executor.cpp  |  1 -
 be/src/vec/exec/scan/pip_scanner_context.h |  7 ++-
 be/src/vec/exec/scan/scanner_context.cpp   | 68 +++++++++++++-----------------
 be/src/vec/exec/scan/scanner_context.h     | 28 +++++-------
 be/src/vec/exec/scan/scanner_scheduler.cpp | 12 ++++--
 7 files changed, 55 insertions(+), 66 deletions(-)

diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index c8f46b2ed12..dc30bf163a5 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -41,7 +41,6 @@
 #include "runtime/runtime_state.h"
 #include "util/debug_util.h"
 #include "util/runtime_profile.h"
-#include "util/stack_util.h"
 #include "util/uid_util.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/core/block.h"
@@ -206,7 +205,7 @@ Status ExecNode::close(RuntimeState* state) {
                   << " already closed";
         return Status::OK();
     }
-    LOG(INFO) << "fragment_instance_id=" << 
print_id(state->fragment_instance_id()) << " closed. ";
+    LOG(INFO) << "fragment_instance_id=" << 
print_id(state->fragment_instance_id()) << " closed";
     _is_closed = true;
 
     Status result;
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 1f15b1d61f8..f34461a9fd2 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -44,7 +44,7 @@ bool ScanOperator::can_read() {
             return true;
         } else {
             if (_node->_scanner_ctx->get_num_running_scanners() == 0 &&
-                _node->_scanner_ctx->should_be_scheduled()) {
+                _node->_scanner_ctx->has_enough_space_in_blocks_queue()) {
                 _node->_scanner_ctx->reschedule_scanner_ctx();
             }
             return _node->ready_to_read(); // there are some blocks to process
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index a98085c342f..2d7f8c79520 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -53,7 +53,6 @@
 #include "util/container_util.hpp"
 #include "util/defer_op.h"
 #include "util/pretty_printer.h"
-#include "util/stack_util.h"
 #include "util/telemetry/telemetry.h"
 #include "util/threadpool.h"
 #include "util/time.h"
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h 
b/be/src/vec/exec/scan/pip_scanner_context.h
index f52bd3bf3c7..b98c628368e 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -166,6 +166,10 @@ public:
         _free_blocks_memory_usage->add(free_blocks_memory_usage);
     }
 
+    bool has_enough_space_in_blocks_queue() const override {
+        return _current_used_bytes < _max_bytes_in_queue / 2 * 
_num_parallel_instances;
+    }
+
     void _dispose_coloate_blocks_not_in_queue() override {
         if (_need_colocate_distribute) {
             for (int i = 0; i < _num_parallel_instances; ++i) {
@@ -217,7 +221,8 @@ private:
                     std::lock_guard<std::mutex> queue_l(*_queue_mutexs[loc]);
                     
_blocks_queues[loc].emplace_back(std::move(_colocate_blocks[loc]));
                 }
-                _colocate_blocks[loc] = get_free_block();
+                bool get_block_not_empty = true;
+                _colocate_blocks[loc] = get_free_block(&get_block_not_empty, 
get_block_not_empty);
                 _colocate_mutable_blocks[loc]->set_muatable_columns(
                         _colocate_blocks[loc]->mutate_columns());
             }
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 8deb2153478..478d9fb4cb7 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -52,7 +52,7 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_, 
doris::vectorized::V
           _process_status(Status::OK()),
           _batch_size(state_->batch_size()),
           limit(limit_),
-          _max_bytes_in_queue(max_bytes_in_blocks_queue_ * 
num_parallel_instances),
+          _max_bytes_in_queue(max_bytes_in_blocks_queue_),
           _scanner_scheduler(state_->exec_env()->scanner_scheduler()),
           _scanners(scanners_),
           _num_parallel_instances(num_parallel_instances) {
@@ -63,21 +63,26 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_, 
doris::vectorized::V
     if (limit < 0) {
         limit = -1;
     }
+}
 
+// After init function call, should not access _parent
+Status ScannerContext::init() {
+    // 1. Calculate max concurrency
+    // TODO: now the max thread num <= 
config::doris_scanner_thread_pool_thread_num / 4
+    // should find a more reasonable value.
     _max_thread_num = config::doris_scanner_thread_pool_thread_num / 4;
-    _max_thread_num *= num_parallel_instances;
+    if (_parent->_shared_scan_opt) {
+        DCHECK(_num_parallel_instances > 0);
+        _max_thread_num *= _num_parallel_instances;
+    }
     _max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
     DCHECK(_max_thread_num > 0);
     _max_thread_num = std::min(_max_thread_num, (int32_t)_scanners.size());
-    // 1. Calculate max concurrency
     // For select * from table limit 10; should just use one thread.
     if (_parent->should_run_serial()) {
         _max_thread_num = 1;
     }
-}
 
-// After init function call, should not access _parent
-Status ScannerContext::init() {
     _scanner_profile = _parent->_scanner_profile;
     _scanner_sched_counter = _parent->_scanner_sched_counter;
     _scanner_ctx_sched_counter = _parent->_scanner_ctx_sched_counter;
@@ -99,9 +104,6 @@ Status ScannerContext::init() {
             limit == -1 ? _batch_size : 
std::min(static_cast<int64_t>(_batch_size), limit);
     _block_per_scanner = (doris_scanner_row_num + (real_block_size - 1)) / 
real_block_size;
     _free_blocks_capacity = _max_thread_num * _block_per_scanner;
-    auto block = get_free_block();
-    _estimated_block_bytes = std::max(block->allocated_bytes(), (size_t)16);
-    return_free_block(std::move(block));
 
 #ifndef BE_TEST
     // 3. get thread token
@@ -121,33 +123,27 @@ Status ScannerContext::init() {
     return Status::OK();
 }
 
-vectorized::BlockUPtr ScannerContext::get_free_block() {
+vectorized::BlockUPtr ScannerContext::get_free_block(bool* has_free_block,
+                                                     bool get_block_not_empty) 
{
     vectorized::BlockUPtr block;
     if (_free_blocks.try_dequeue(block)) {
-        DCHECK(block->mem_reuse());
-        _free_blocks_memory_usage->add(-block->allocated_bytes());
-        _serving_blocks_num++;
-        return block;
+        if (!get_block_not_empty || block->mem_reuse()) {
+            _free_blocks_capacity--;
+            _free_blocks_memory_usage->add(-block->allocated_bytes());
+            return block;
+        }
     }
 
-    block = vectorized::Block::create_unique(_output_tuple_desc->slots(), 
_batch_size,
-                                             true /*ignore invalid slots*/);
     COUNTER_UPDATE(_newly_create_free_blocks_num, 1);
-
-    _serving_blocks_num++;
-    return block;
+    return vectorized::Block::create_unique(_output_tuple_desc->slots(), 
_batch_size,
+                                            true /*ignore invalid slots*/);
 }
 
 void ScannerContext::return_free_block(std::unique_ptr<vectorized::Block> 
block) {
-    _serving_blocks_num--;
-    if (block->mem_reuse()) {
-        // Only put blocks with schema to free blocks, because colocate blocks
-        // need schema.
-        _estimated_block_bytes = std::max(block->allocated_bytes(), 
(size_t)16);
-        block->clear_column_data();
-        _free_blocks_memory_usage->add(block->allocated_bytes());
-        _free_blocks.enqueue(std::move(block));
-    }
+    block->clear_column_data();
+    _free_blocks_memory_usage->add(block->allocated_bytes());
+    _free_blocks.enqueue(std::move(block));
+    ++_free_blocks_capacity;
 }
 
 void 
ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& 
blocks) {
@@ -180,7 +176,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
     // (if the scheduler continues to schedule, it will cause a lot of busy 
running).
     // At this point, consumers are required to trigger new scheduling to 
ensure that
     // data can be continuously fetched.
-    if (should_be_scheduled() && _num_running_scanners == 0) {
+    if (has_enough_space_in_blocks_queue() && _num_running_scanners == 0) {
         auto state = _scanner_scheduler->submit(this);
         if (state.ok()) {
             _num_scheduling_ctx++;
@@ -188,7 +184,6 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
             set_status_on_error(state, false);
         }
     }
-
     // Wait for block from queue
     if (wait) {
         SCOPED_TIMER(_scanner_wait_batch_timer);
@@ -212,7 +207,6 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
 
         auto block_bytes = (*block)->allocated_bytes();
         _cur_bytes_in_queue -= block_bytes;
-
         _queued_blocks_memory_usage->add(-block_bytes);
         return Status::OK();
     } else {
@@ -359,13 +353,7 @@ void 
ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) {
         _scanners.push_front(scanner);
     }
     std::lock_guard l(_transfer_lock);
-
-    // In pipeline engine, doris will close scanners when `no_schedule`.
-    // We have to decrease _num_running_scanners before schedule, otherwise
-    // schedule does not woring due to _num_running_scanners.
-    _num_running_scanners--;
-
-    if (should_be_scheduled()) {
+    if (has_enough_space_in_blocks_queue()) {
         auto state = _scanner_scheduler->submit(this);
         if (state.ok()) {
             _num_scheduling_ctx++;
@@ -385,6 +373,8 @@ void 
ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) {
         _is_finished = true;
         _blocks_queue_added_cv.notify_one();
     }
+    // In pipeline engine, doris will close scanners when `no_schedule`.
+    _num_running_scanners--;
     _ctx_finish_cv.notify_one();
 }
 
@@ -394,7 +384,7 @@ void 
ScannerContext::get_next_batch_of_scanners(std::list<VScannerSPtr>* current
     {
         // If there are enough space in blocks queue,
         // the scanner number depends on the _free_blocks numbers
-        thread_slot_num = get_available_thread_slot_num();
+        thread_slot_num = cal_thread_slot_num_by_free_block_num();
     }
 
     // 2. get #thread_slot_num scanners from ctx->scanners
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index a345bfc03dd..3aad0d6263f 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -62,12 +62,12 @@ public:
     ScannerContext(RuntimeState* state_, VScanNode* parent,
                    const TupleDescriptor* output_tuple_desc,
                    const std::list<VScannerSPtr>& scanners_, int64_t limit_,
-                   int64_t max_bytes_in_blocks_queue_, const int 
num_parallel_instances = 1);
+                   int64_t max_bytes_in_blocks_queue_, const int 
num_parallel_instances = 0);
 
     virtual ~ScannerContext() = default;
     virtual Status init();
 
-    vectorized::BlockUPtr get_free_block();
+    vectorized::BlockUPtr get_free_block(bool* has_free_block, bool 
get_not_empty_block = false);
     void return_free_block(std::unique_ptr<vectorized::Block> block);
 
     // Append blocks from scanners to the blocks queue.
@@ -136,25 +136,20 @@ public:
     virtual bool empty_in_queue(int id);
 
     // todo(wb) rethinking how to calculate ```_max_bytes_in_queue``` when 
executing shared scan
-    inline bool should_be_scheduled() const {
-        return (_cur_bytes_in_queue < _max_bytes_in_queue / 2) &&
-               (_serving_blocks_num < allowed_blocks_num());
+    virtual inline bool has_enough_space_in_blocks_queue() const {
+        return _cur_bytes_in_queue < _max_bytes_in_queue / 2;
     }
 
-    int get_available_thread_slot_num() {
+    int cal_thread_slot_num_by_free_block_num() {
         int thread_slot_num = 0;
-        thread_slot_num = (allowed_blocks_num() + _block_per_scanner - 1) / 
_block_per_scanner;
+        thread_slot_num = (_free_blocks_capacity + _block_per_scanner - 1) / 
_block_per_scanner;
         thread_slot_num = std::min(thread_slot_num, _max_thread_num - 
_num_running_scanners);
+        if (thread_slot_num <= 0) {
+            thread_slot_num = 1;
+        }
         return thread_slot_num;
     }
 
-    int32_t allowed_blocks_num() const {
-        int32_t blocks_num = std::min(_free_blocks_capacity,
-                                      int32_t((_max_bytes_in_queue + 
_estimated_block_bytes - 1) /
-                                              _estimated_block_bytes));
-        return blocks_num;
-    }
-
     void reschedule_scanner_ctx();
 
     // the unique id of this context
@@ -208,12 +203,10 @@ protected:
 
     // Lazy-allocated blocks for all scanners to share, for memory reuse.
     moodycamel::ConcurrentQueue<vectorized::BlockUPtr> _free_blocks;
-    std::atomic<int32_t> _serving_blocks_num = 0;
     // The current number of free blocks available to the scanners.
     // Used to limit the memory usage of the scanner.
     // NOTE: this is NOT the size of `_free_blocks`.
-    int32_t _free_blocks_capacity = 0;
-    int64_t _estimated_block_bytes = 0;
+    std::atomic_int32_t _free_blocks_capacity = 0;
 
     int _batch_size;
     // The limit from SQL's limit clause
@@ -238,7 +231,6 @@ protected:
     int64_t _cur_bytes_in_queue = 0;
     // The max limit bytes of blocks in blocks queue
     const int64_t _max_bytes_in_queue;
-    std::atomic<int64_t> _bytes_allocated = 0;
 
     doris::vectorized::ScannerScheduler* _scanner_scheduler;
     // List "scanners" saves all "unfinished" scanners.
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 2529ce67e5e..3481128a1d2 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -321,6 +321,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler, ScannerContext
     int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num;
     int64_t raw_bytes_read = 0;
     int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
+    bool has_free_block = true;
     int num_rows_in_block = 0;
 
     // Only set to true when ctx->done() return true.
@@ -330,8 +331,9 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler, ScannerContext
     bool should_stop = false;
     // Has to wait at least one full block, or it will cause a lot of schedule 
task in priority
     // queue, it will affect query latency and query concurrency for example 
ssb 3.3.
-    while (!eos && raw_bytes_read < raw_bytes_threshold && raw_rows_read < 
raw_rows_threshold &&
-           num_rows_in_block < state->batch_size()) {
+    while (!eos && raw_bytes_read < raw_bytes_threshold &&
+           ((raw_rows_read < raw_rows_threshold && has_free_block) ||
+            num_rows_in_block < state->batch_size())) {
         if (UNLIKELY(ctx->done())) {
             // No need to set status on error here.
             // Because done() maybe caused by "should_stop"
@@ -339,7 +341,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler, ScannerContext
             break;
         }
 
-        BlockUPtr block = ctx->get_free_block();
+        BlockUPtr block = ctx->get_free_block(&has_free_block);
         status = scanner->get_block(state, block.get(), &eos);
         VLOG_ROW << "VScanNode input rows: " << block->rows() << ", eos: " << 
eos;
         // The VFileScanner for external table may try to open not exist files,
@@ -355,11 +357,12 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler, ScannerContext
         if (status.is<ErrorCode::NOT_FOUND>()) {
             // The only case in this "if" branch is external table file delete 
and fe cache has not been updated yet.
             // Set status to OK.
+            LOG(INFO) << "scan range not found: " << 
scanner->get_current_scan_range_name();
             status = Status::OK();
             eos = true;
         }
 
-        raw_bytes_read += block->allocated_bytes();
+        raw_bytes_read += block->bytes();
         num_rows_in_block += block->rows();
         if (UNLIKELY(block->rows() == 0)) {
             ctx->return_free_block(std::move(block));
@@ -394,6 +397,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler, ScannerContext
     if (eos || should_stop) {
         scanner->mark_to_need_to_close();
     }
+
     ctx->push_back_scanner_and_reschedule(scanner);
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to