This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b92b63e1537 Revert "[fix](scanner) Fix incorrect _max_thread_num in
scanner context" (#40804)
b92b63e1537 is described below
commit b92b63e153789646a96a520456f549202b5eef06
Author: zhiqiang <[email protected]>
AuthorDate: Mon Sep 16 09:03:32 2024 +0800
Revert "[fix](scanner) Fix incorrect _max_thread_num in scanner context"
(#40804)
Reverts apache/doris#40569
We need more test to avoid performance issue
---
be/src/pipeline/exec/scan_operator.cpp | 11 +-
be/src/vec/exec/scan/scanner_context.cpp | 120 +++++++++------------
be/src/vec/exec/scan/scanner_context.h | 9 +-
.../java/org/apache/doris/qe/SessionVariable.java | 5 +-
4 files changed, 66 insertions(+), 79 deletions(-)
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index eb30d62495d..0c0cfb18c77 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -996,7 +996,16 @@ Status ScanLocalState<Derived>::_start_scanners(
auto& p = _parent->cast<typename Derived::Parent>();
_scanner_ctx = vectorized::ScannerContext::create_shared(
state(), this, p._output_tuple_desc, p.output_row_descriptor(),
scanners, p.limit(),
- _scan_dependency, p.ignore_data_distribution());
+ state()->scan_queue_mem_limit(), _scan_dependency,
+ // NOTE: This will logic makes _max_thread_num of ScannerContext
to be C(num of cores) * 2
+ // For a query with C/2 instance and M scan node, scan task of
this query will be C/2 * M * C*2
+ // and will be C*C*N at most.
+ // 1. If data distribution is ignored , we use 1 instance to scan.
+ // 2. Else if this operator is not file scan operator, we use
config::doris_scanner_thread_pool_thread_num scanners to scan.
+ // 3. Else, file scanner will consume much memory so we use
config::doris_scanner_thread_pool_thread_num / query_parallel_instance_num
scanners to scan.
+ p.ignore_data_distribution() || !p.is_file_scan_operator()
+ ? 1
+ : state()->query_parallel_instance_num());
return Status::OK();
}
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index b5cb47fda1b..cbb3d0f5723 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -42,7 +42,8 @@ ScannerContext::ScannerContext(
RuntimeState* state, pipeline::ScanLocalStateBase* local_state,
const TupleDescriptor* output_tuple_desc, const RowDescriptor*
output_row_descriptor,
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>&
scanners, int64_t limit_,
- std::shared_ptr<pipeline::Dependency> dependency, bool
ignore_data_distribution)
+ int64_t max_bytes_in_blocks_queue,
std::shared_ptr<pipeline::Dependency> dependency,
+ const int num_parallel_instances)
: HasTaskExecutionCtx(state),
_state(state),
_local_state(local_state),
@@ -52,102 +53,53 @@ ScannerContext::ScannerContext(
_output_row_descriptor(output_row_descriptor),
_batch_size(state->batch_size()),
limit(limit_),
+ _max_bytes_in_queue(std::max(max_bytes_in_blocks_queue,
(int64_t)1024) *
+ num_parallel_instances),
_scanner_scheduler(state->exec_env()->scanner_scheduler()),
_all_scanners(scanners.begin(), scanners.end()),
- _ignore_data_distribution(ignore_data_distribution) {
+ _num_parallel_instances(num_parallel_instances) {
DCHECK(_output_row_descriptor == nullptr ||
_output_row_descriptor->tuple_descriptors().size() == 1);
_query_id = _state->get_query_ctx()->query_id();
ctx_id = UniqueId::gen_uid().to_string();
- _scanners.enqueue_bulk(scanners.begin(), scanners.size());
- if (limit < 0) {
- limit = -1;
- }
- MAX_SCALE_UP_RATIO = _state->scanner_scale_up_ratio();
- _query_thread_context = {_query_id, _state->query_mem_tracker(),
- _state->get_query_ctx()->workload_group()};
- _dependency = dependency;
-}
-
-// After init function call, should not access _parent
-Status ScannerContext::init() {
- _scanner_profile = _local_state->_scanner_profile;
- _scanner_sched_counter = _local_state->_scanner_sched_counter;
- _newly_create_free_blocks_num =
_local_state->_newly_create_free_blocks_num;
- _scanner_wait_batch_timer = _local_state->_scanner_wait_batch_timer;
- _scanner_ctx_sched_time = _local_state->_scanner_ctx_sched_time;
- _scale_up_scanners_counter = _local_state->_scale_up_scanners_counter;
-
-#ifndef BE_TEST
- // 3. get thread token
- if (_state->get_query_ctx()) {
- thread_token = _state->get_query_ctx()->get_token();
- _simple_scan_scheduler = _state->get_query_ctx()->get_scan_scheduler();
- if (_simple_scan_scheduler) {
- _should_reset_thread_name = false;
- }
- _remote_scan_task_scheduler =
_state->get_query_ctx()->get_remote_scan_scheduler();
- }
-#endif
- _local_state->_runtime_profile->add_info_string("UseSpecificThreadToken",
- thread_token == nullptr ?
"False" : "True");
-
- int num_parallel_instances = _state->query_parallel_instance_num();
-
- // NOTE: When ignore_data_distribution is true, the parallelism
- // of the scan operator is regarded as 1 (actually maybe not).
- // That will make the number of scan task can be submitted to the scheduler
- // in a vary large value. This logicl is kept from the older
implementation.
- // https://github.com/apache/doris/pull/28266
- if (_ignore_data_distribution) {
- num_parallel_instances = 1;
- }
-
- // _max_bytes_in_queue controls the maximum memory that can be used by a
single scan instance.
- // scan_queue_mem_limit on FE is 100MB by default, on backend we will make
sure its actual value
- // is larger than 10MB.
- _max_bytes_in_queue = std::max(_state->scan_queue_mem_limit(),
(int64_t)1024 * 1024 * 10);
-
// Provide more memory for wide tables, increase proportionally by
multiples of 300
_max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1;
-
- // TODO: Where is the proper position to place this code?
- if (_all_scanners.empty()) {
+ if (scanners.empty()) {
_is_finished = true;
_set_scanner_done();
}
-
+ _scanners.enqueue_bulk(scanners.begin(), scanners.size());
+ if (limit < 0) {
+ limit = -1;
+ }
+ MAX_SCALE_UP_RATIO = _state->scanner_scale_up_ratio();
// _max_thread_num controls how many scanners of this ScanOperator can be
submitted to scheduler at a time.
// The overall target of our system is to make full utilization of the
resources.
- // At the same time, we dont want too many tasks are queued by scheduler,
that is not necessary.
- // So, first of all, we try to make sure _max_thread_num of a ScanNode of
a query on a single backend is less than
- // 2 * config::doris_scanner_thread_pool_thread_num, so that we can make
all io threads busy.
+ // At the same time, we dont want too many tasks are queued by scheduler,
that makes the query
+ // waiting too long, and existing task can not be scheduled in time.
+ // First of all, we try to make sure _max_thread_num of a ScanNode of a
query on a single backend is less than
+ // config::doris_scanner_thread_pool_thread_num.
// For example, on a 64-core machine, the default value of
config::doris_scanner_thread_pool_thread_num will be 64*2 =128.
// and the num_parallel_instances of this scan operator will be 64/2=32.
- // For a query who has one scan nodes, the _max_thread_num of each scan
node instance will be 2 * 128 / 32 = 8.
- // We have 32 instances of this scan operator, so for the ScanNode, we
have 8 * 32 = 256 scanner tasks can be submitted at a time.
- // The thread pool of scanner is 128, that means we will have 128 tasks
running in parallel and another 128 tasks are waiting in the queue.
- // When first 128 tasks are finished, the next 128 tasks will be
extricated from the queue and be executed,
- // and another 128 tasks will be submitted to the queue if there are
remaining.
+ // For a query who has two scan nodes, the _max_thread_num of each scan
node instance will be 128 / 32 = 4.
+ // We have 32 instances of this scan operator, so for the ScanNode, we
have 4 * 32 = 128 scanner tasks can be submitted at a time.
+ // Remember that we have to ScanNode in this query, so the total number of
scanner tasks can be submitted at a time is 128 * 2 = 256.
_max_thread_num =
_state->num_scanner_threads() > 0
? _state->num_scanner_threads()
- : 2 * (config::doris_scanner_thread_pool_thread_num /
num_parallel_instances);
+ : config::doris_scanner_thread_pool_thread_num /
num_parallel_instances;
_max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
// In some situation, there are not too many big tablets involed, so we
can reduce the thread number.
- // NOTE: when _all_scanners.size is zero, the _max_thread_num will be 0.
- _max_thread_num = std::min(_max_thread_num, (int32_t)_all_scanners.size());
-
+ _max_thread_num = std::min(_max_thread_num, (int32_t)scanners.size());
// 1. Calculate max concurrency
// For select * from table limit 10; should just use one thread.
if (_local_state->should_run_serial()) {
_max_thread_num = 1;
}
-
// when user not specify scan_thread_num, so we can try downgrade
_max_thread_num.
// becaue we found in a table with 5k columns, column reader may ocuppy
too much memory.
// you can refer https://github.com/apache/doris/issues/35340 for details.
- int32_t max_column_reader_num =
_state->query_options().max_column_reader_num;
+ int32_t max_column_reader_num =
state->query_options().max_column_reader_num;
if (_max_thread_num != 1 && max_column_reader_num > 0) {
int32_t scan_column_num = _output_tuple_desc->slots().size();
int32_t current_column_num = scan_column_num * _max_thread_num;
@@ -157,7 +109,7 @@ Status ScannerContext::init() {
if (new_max_thread_num < _max_thread_num) {
int32_t origin_max_thread_num = _max_thread_num;
_max_thread_num = new_max_thread_num;
- LOG(INFO) << "downgrade query:" << print_id(_state->query_id())
+ LOG(INFO) << "downgrade query:" << print_id(state->query_id())
<< " scan's max_thread_num from " <<
origin_max_thread_num << " to "
<< _max_thread_num << ",column num: " <<
scan_column_num
<< ", max_column_reader_num: " <<
max_column_reader_num;
@@ -165,7 +117,35 @@ Status ScannerContext::init() {
}
}
+ _query_thread_context = {_query_id, _state->query_mem_tracker(),
+ _state->get_query_ctx()->workload_group()};
+ _dependency = dependency;
+}
+
+// After init function call, should not access _parent
+Status ScannerContext::init() {
+ _scanner_profile = _local_state->_scanner_profile;
+ _scanner_sched_counter = _local_state->_scanner_sched_counter;
+ _newly_create_free_blocks_num =
_local_state->_newly_create_free_blocks_num;
+ _scanner_wait_batch_timer = _local_state->_scanner_wait_batch_timer;
+ _scanner_ctx_sched_time = _local_state->_scanner_ctx_sched_time;
+ _scale_up_scanners_counter = _local_state->_scale_up_scanners_counter;
+
+#ifndef BE_TEST
+ // 3. get thread token
+ if (_state->get_query_ctx()) {
+ thread_token = _state->get_query_ctx()->get_token();
+ _simple_scan_scheduler = _state->get_query_ctx()->get_scan_scheduler();
+ if (_simple_scan_scheduler) {
+ _should_reset_thread_name = false;
+ }
+ _remote_scan_task_scheduler =
_state->get_query_ctx()->get_remote_scan_scheduler();
+ }
+#endif
+
COUNTER_SET(_local_state->_max_scanner_thread_num,
(int64_t)_max_thread_num);
+ _local_state->_runtime_profile->add_info_string("UseSpecificThreadToken",
+ thread_token == nullptr ?
"False" : "True");
// submit `_max_thread_num` running scanners to `ScannerScheduler`
// When a running scanners is finished, it will submit one of the
remaining scanners.
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index 36eb20c220d..03c4e5a4f1b 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -105,8 +105,9 @@ public:
const TupleDescriptor* output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const
std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
- int64_t limit_, std::shared_ptr<pipeline::Dependency>
dependency,
- bool ignore_data_distribution);
+ int64_t limit_, int64_t max_bytes_in_blocks_queue,
+ std::shared_ptr<pipeline::Dependency> dependency,
+ const int num_parallel_instances);
~ScannerContext() override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
@@ -209,7 +210,7 @@ protected:
int64_t limit;
int32_t _max_thread_num = 0;
- int64_t _max_bytes_in_queue = 0;
+ int64_t _max_bytes_in_queue;
doris::vectorized::ScannerScheduler* _scanner_scheduler;
SimplifiedScanScheduler* _simple_scan_scheduler = nullptr;
SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr;
@@ -219,6 +220,7 @@ protected:
int32_t _num_running_scanners = 0;
// weak pointer for _scanners, used in stop function
std::vector<std::weak_ptr<ScannerDelegate>> _all_scanners;
+ const int _num_parallel_instances;
std::shared_ptr<RuntimeProfile> _scanner_profile;
RuntimeProfile::Counter* _scanner_sched_counter = nullptr;
RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
@@ -227,7 +229,6 @@ protected:
RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr;
QueryThreadContext _query_thread_context;
std::shared_ptr<pipeline::Dependency> _dependency = nullptr;
- bool _ignore_data_distribution = false;
// for scaling up the running scanners
size_t _estimated_block_size = 0;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 36650001cb0..a97a098bdc1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -698,10 +698,7 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = EXEC_MEM_LIMIT)
public long maxExecMemByte = 2147483648L;
- @VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT,
- description = {"每个 Scan Instance 的 block queue 能够保存多少字节的 block",
- "How many bytes of block can be saved in the block queue
of each Scan Instance"})
- // 100MB
+ @VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT)
public long maxScanQueueMemByte = 2147483648L / 20;
@VariableMgr.VarAttr(name = NUM_SCANNER_THREADS, needForward = true,
description = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]