This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new ea676de6de2 Revert "[improvement](scanner_schedule) reduce memory
consumption of scanner #24199 (#25547)" (#26613)
ea676de6de2 is described below
commit ea676de6de22ef27005cf44a4abb258ed262ca61
Author: Kang <[email protected]>
AuthorDate: Wed Nov 8 11:04:07 2023 -0600
Revert "[improvement](scanner_schedule) reduce memory consumption of
scanner #24199 (#25547)" (#26613)
This reverts commit 9a19581a2c55c6e78e7e2812a45e1bc3842ffec7 to investigate
ANALYZE TABLE WITH SYNC problem
---
be/src/exec/exec_node.cpp | 3 +-
be/src/pipeline/exec/scan_operator.cpp | 2 +-
be/src/runtime/plan_fragment_executor.cpp | 1 -
be/src/vec/exec/scan/pip_scanner_context.h | 7 ++-
be/src/vec/exec/scan/scanner_context.cpp | 68 +++++++++++++-----------------
be/src/vec/exec/scan/scanner_context.h | 28 +++++-------
be/src/vec/exec/scan/scanner_scheduler.cpp | 12 ++++--
7 files changed, 55 insertions(+), 66 deletions(-)
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index c8f46b2ed12..dc30bf163a5 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -41,7 +41,6 @@
#include "runtime/runtime_state.h"
#include "util/debug_util.h"
#include "util/runtime_profile.h"
-#include "util/stack_util.h"
#include "util/uid_util.h"
#include "vec/columns/column_nullable.h"
#include "vec/core/block.h"
@@ -206,7 +205,7 @@ Status ExecNode::close(RuntimeState* state) {
<< " already closed";
return Status::OK();
}
- LOG(INFO) << "fragment_instance_id=" <<
print_id(state->fragment_instance_id()) << " closed. ";
+ LOG(INFO) << "fragment_instance_id=" <<
print_id(state->fragment_instance_id()) << " closed";
_is_closed = true;
Status result;
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 1f15b1d61f8..f34461a9fd2 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -44,7 +44,7 @@ bool ScanOperator::can_read() {
return true;
} else {
if (_node->_scanner_ctx->get_num_running_scanners() == 0 &&
- _node->_scanner_ctx->should_be_scheduled()) {
+ _node->_scanner_ctx->has_enough_space_in_blocks_queue()) {
_node->_scanner_ctx->reschedule_scanner_ctx();
}
return _node->ready_to_read(); // there are some blocks to process
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index a98085c342f..2d7f8c79520 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -53,7 +53,6 @@
#include "util/container_util.hpp"
#include "util/defer_op.h"
#include "util/pretty_printer.h"
-#include "util/stack_util.h"
#include "util/telemetry/telemetry.h"
#include "util/threadpool.h"
#include "util/time.h"
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h
b/be/src/vec/exec/scan/pip_scanner_context.h
index f52bd3bf3c7..b98c628368e 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -166,6 +166,10 @@ public:
_free_blocks_memory_usage->add(free_blocks_memory_usage);
}
+ bool has_enough_space_in_blocks_queue() const override {
+ return _current_used_bytes < _max_bytes_in_queue / 2 *
_num_parallel_instances;
+ }
+
void _dispose_coloate_blocks_not_in_queue() override {
if (_need_colocate_distribute) {
for (int i = 0; i < _num_parallel_instances; ++i) {
@@ -217,7 +221,8 @@ private:
std::lock_guard<std::mutex> queue_l(*_queue_mutexs[loc]);
_blocks_queues[loc].emplace_back(std::move(_colocate_blocks[loc]));
}
- _colocate_blocks[loc] = get_free_block();
+ bool get_block_not_empty = true;
+ _colocate_blocks[loc] = get_free_block(&get_block_not_empty,
get_block_not_empty);
_colocate_mutable_blocks[loc]->set_muatable_columns(
_colocate_blocks[loc]->mutate_columns());
}
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index 8deb2153478..478d9fb4cb7 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -52,7 +52,7 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_,
doris::vectorized::V
_process_status(Status::OK()),
_batch_size(state_->batch_size()),
limit(limit_),
- _max_bytes_in_queue(max_bytes_in_blocks_queue_ *
num_parallel_instances),
+ _max_bytes_in_queue(max_bytes_in_blocks_queue_),
_scanner_scheduler(state_->exec_env()->scanner_scheduler()),
_scanners(scanners_),
_num_parallel_instances(num_parallel_instances) {
@@ -63,21 +63,26 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_,
doris::vectorized::V
if (limit < 0) {
limit = -1;
}
+}
+// After init function call, should not access _parent
+Status ScannerContext::init() {
+ // 1. Calculate max concurrency
+ // TODO: now the max thread num <=
config::doris_scanner_thread_pool_thread_num / 4
+ // should find a more reasonable value.
_max_thread_num = config::doris_scanner_thread_pool_thread_num / 4;
- _max_thread_num *= num_parallel_instances;
+ if (_parent->_shared_scan_opt) {
+ DCHECK(_num_parallel_instances > 0);
+ _max_thread_num *= _num_parallel_instances;
+ }
_max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
DCHECK(_max_thread_num > 0);
_max_thread_num = std::min(_max_thread_num, (int32_t)_scanners.size());
- // 1. Calculate max concurrency
// For select * from table limit 10; should just use one thread.
if (_parent->should_run_serial()) {
_max_thread_num = 1;
}
-}
-// After init function call, should not access _parent
-Status ScannerContext::init() {
_scanner_profile = _parent->_scanner_profile;
_scanner_sched_counter = _parent->_scanner_sched_counter;
_scanner_ctx_sched_counter = _parent->_scanner_ctx_sched_counter;
@@ -99,9 +104,6 @@ Status ScannerContext::init() {
limit == -1 ? _batch_size :
std::min(static_cast<int64_t>(_batch_size), limit);
_block_per_scanner = (doris_scanner_row_num + (real_block_size - 1)) /
real_block_size;
_free_blocks_capacity = _max_thread_num * _block_per_scanner;
- auto block = get_free_block();
- _estimated_block_bytes = std::max(block->allocated_bytes(), (size_t)16);
- return_free_block(std::move(block));
#ifndef BE_TEST
// 3. get thread token
@@ -121,33 +123,27 @@ Status ScannerContext::init() {
return Status::OK();
}
-vectorized::BlockUPtr ScannerContext::get_free_block() {
+vectorized::BlockUPtr ScannerContext::get_free_block(bool* has_free_block,
+ bool get_block_not_empty)
{
vectorized::BlockUPtr block;
if (_free_blocks.try_dequeue(block)) {
- DCHECK(block->mem_reuse());
- _free_blocks_memory_usage->add(-block->allocated_bytes());
- _serving_blocks_num++;
- return block;
+ if (!get_block_not_empty || block->mem_reuse()) {
+ _free_blocks_capacity--;
+ _free_blocks_memory_usage->add(-block->allocated_bytes());
+ return block;
+ }
}
- block = vectorized::Block::create_unique(_output_tuple_desc->slots(),
_batch_size,
- true /*ignore invalid slots*/);
COUNTER_UPDATE(_newly_create_free_blocks_num, 1);
-
- _serving_blocks_num++;
- return block;
+ return vectorized::Block::create_unique(_output_tuple_desc->slots(),
_batch_size,
+ true /*ignore invalid slots*/);
}
void ScannerContext::return_free_block(std::unique_ptr<vectorized::Block>
block) {
- _serving_blocks_num--;
- if (block->mem_reuse()) {
- // Only put blocks with schema to free blocks, because colocate blocks
- // need schema.
- _estimated_block_bytes = std::max(block->allocated_bytes(),
(size_t)16);
- block->clear_column_data();
- _free_blocks_memory_usage->add(block->allocated_bytes());
- _free_blocks.enqueue(std::move(block));
- }
+ block->clear_column_data();
+ _free_blocks_memory_usage->add(block->allocated_bytes());
+ _free_blocks.enqueue(std::move(block));
+ ++_free_blocks_capacity;
}
void
ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>&
blocks) {
@@ -180,7 +176,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState*
state, vectorized::Blo
// (if the scheduler continues to schedule, it will cause a lot of busy
running).
// At this point, consumers are required to trigger new scheduling to
ensure that
// data can be continuously fetched.
- if (should_be_scheduled() && _num_running_scanners == 0) {
+ if (has_enough_space_in_blocks_queue() && _num_running_scanners == 0) {
auto state = _scanner_scheduler->submit(this);
if (state.ok()) {
_num_scheduling_ctx++;
@@ -188,7 +184,6 @@ Status ScannerContext::get_block_from_queue(RuntimeState*
state, vectorized::Blo
set_status_on_error(state, false);
}
}
-
// Wait for block from queue
if (wait) {
SCOPED_TIMER(_scanner_wait_batch_timer);
@@ -212,7 +207,6 @@ Status ScannerContext::get_block_from_queue(RuntimeState*
state, vectorized::Blo
auto block_bytes = (*block)->allocated_bytes();
_cur_bytes_in_queue -= block_bytes;
-
_queued_blocks_memory_usage->add(-block_bytes);
return Status::OK();
} else {
@@ -359,13 +353,7 @@ void
ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) {
_scanners.push_front(scanner);
}
std::lock_guard l(_transfer_lock);
-
- // In pipeline engine, doris will close scanners when `no_schedule`.
- // We have to decrease _num_running_scanners before schedule, otherwise
- // schedule does not woring due to _num_running_scanners.
- _num_running_scanners--;
-
- if (should_be_scheduled()) {
+ if (has_enough_space_in_blocks_queue()) {
auto state = _scanner_scheduler->submit(this);
if (state.ok()) {
_num_scheduling_ctx++;
@@ -385,6 +373,8 @@ void
ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) {
_is_finished = true;
_blocks_queue_added_cv.notify_one();
}
+ // In pipeline engine, doris will close scanners when `no_schedule`.
+ _num_running_scanners--;
_ctx_finish_cv.notify_one();
}
@@ -394,7 +384,7 @@ void
ScannerContext::get_next_batch_of_scanners(std::list<VScannerSPtr>* current
{
// If there are enough space in blocks queue,
// the scanner number depends on the _free_blocks numbers
- thread_slot_num = get_available_thread_slot_num();
+ thread_slot_num = cal_thread_slot_num_by_free_block_num();
}
// 2. get #thread_slot_num scanners from ctx->scanners
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index a345bfc03dd..3aad0d6263f 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -62,12 +62,12 @@ public:
ScannerContext(RuntimeState* state_, VScanNode* parent,
const TupleDescriptor* output_tuple_desc,
const std::list<VScannerSPtr>& scanners_, int64_t limit_,
- int64_t max_bytes_in_blocks_queue_, const int
num_parallel_instances = 1);
+ int64_t max_bytes_in_blocks_queue_, const int
num_parallel_instances = 0);
virtual ~ScannerContext() = default;
virtual Status init();
- vectorized::BlockUPtr get_free_block();
+ vectorized::BlockUPtr get_free_block(bool* has_free_block, bool
get_not_empty_block = false);
void return_free_block(std::unique_ptr<vectorized::Block> block);
// Append blocks from scanners to the blocks queue.
@@ -136,25 +136,20 @@ public:
virtual bool empty_in_queue(int id);
// todo(wb) rethinking how to calculate ```_max_bytes_in_queue``` when
executing shared scan
- inline bool should_be_scheduled() const {
- return (_cur_bytes_in_queue < _max_bytes_in_queue / 2) &&
- (_serving_blocks_num < allowed_blocks_num());
+ virtual inline bool has_enough_space_in_blocks_queue() const {
+ return _cur_bytes_in_queue < _max_bytes_in_queue / 2;
}
- int get_available_thread_slot_num() {
+ int cal_thread_slot_num_by_free_block_num() {
int thread_slot_num = 0;
- thread_slot_num = (allowed_blocks_num() + _block_per_scanner - 1) /
_block_per_scanner;
+ thread_slot_num = (_free_blocks_capacity + _block_per_scanner - 1) /
_block_per_scanner;
thread_slot_num = std::min(thread_slot_num, _max_thread_num -
_num_running_scanners);
+ if (thread_slot_num <= 0) {
+ thread_slot_num = 1;
+ }
return thread_slot_num;
}
- int32_t allowed_blocks_num() const {
- int32_t blocks_num = std::min(_free_blocks_capacity,
- int32_t((_max_bytes_in_queue +
_estimated_block_bytes - 1) /
- _estimated_block_bytes));
- return blocks_num;
- }
-
void reschedule_scanner_ctx();
// the unique id of this context
@@ -208,12 +203,10 @@ protected:
// Lazy-allocated blocks for all scanners to share, for memory reuse.
moodycamel::ConcurrentQueue<vectorized::BlockUPtr> _free_blocks;
- std::atomic<int32_t> _serving_blocks_num = 0;
// The current number of free blocks available to the scanners.
// Used to limit the memory usage of the scanner.
// NOTE: this is NOT the size of `_free_blocks`.
- int32_t _free_blocks_capacity = 0;
- int64_t _estimated_block_bytes = 0;
+ std::atomic_int32_t _free_blocks_capacity = 0;
int _batch_size;
// The limit from SQL's limit clause
@@ -238,7 +231,6 @@ protected:
int64_t _cur_bytes_in_queue = 0;
// The max limit bytes of blocks in blocks queue
const int64_t _max_bytes_in_queue;
- std::atomic<int64_t> _bytes_allocated = 0;
doris::vectorized::ScannerScheduler* _scanner_scheduler;
// List "scanners" saves all "unfinished" scanners.
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 2529ce67e5e..3481128a1d2 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -321,6 +321,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler*
scheduler, ScannerContext
int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num;
int64_t raw_bytes_read = 0;
int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
+ bool has_free_block = true;
int num_rows_in_block = 0;
// Only set to true when ctx->done() return true.
@@ -330,8 +331,9 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler*
scheduler, ScannerContext
bool should_stop = false;
// Has to wait at least one full block, or it will cause a lot of schedule
task in priority
// queue, it will affect query latency and query concurrency for example
ssb 3.3.
- while (!eos && raw_bytes_read < raw_bytes_threshold && raw_rows_read <
raw_rows_threshold &&
- num_rows_in_block < state->batch_size()) {
+ while (!eos && raw_bytes_read < raw_bytes_threshold &&
+ ((raw_rows_read < raw_rows_threshold && has_free_block) ||
+ num_rows_in_block < state->batch_size())) {
if (UNLIKELY(ctx->done())) {
// No need to set status on error here.
// Because done() maybe caused by "should_stop"
@@ -339,7 +341,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler*
scheduler, ScannerContext
break;
}
- BlockUPtr block = ctx->get_free_block();
+ BlockUPtr block = ctx->get_free_block(&has_free_block);
status = scanner->get_block(state, block.get(), &eos);
VLOG_ROW << "VScanNode input rows: " << block->rows() << ", eos: " <<
eos;
// The VFileScanner for external table may try to open not exist files,
@@ -355,11 +357,12 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler*
scheduler, ScannerContext
if (status.is<ErrorCode::NOT_FOUND>()) {
// The only case in this "if" branch is external table file delete
and fe cache has not been updated yet.
// Set status to OK.
+ LOG(INFO) << "scan range not found: " <<
scanner->get_current_scan_range_name();
status = Status::OK();
eos = true;
}
- raw_bytes_read += block->allocated_bytes();
+ raw_bytes_read += block->bytes();
num_rows_in_block += block->rows();
if (UNLIKELY(block->rows() == 0)) {
ctx->return_free_block(std::move(block));
@@ -394,6 +397,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler*
scheduler, ScannerContext
if (eos || should_stop) {
scanner->mark_to_need_to_close();
}
+
ctx->push_back_scanner_and_reschedule(scanner);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]