This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 648c622b95c [opt](scanner profile) More counter for scanner (#40144)
648c622b95c is described below
commit 648c622b95c2538fc74b8ded99bad4291314859b
Author: zhiqiang <[email protected]>
AuthorDate: Fri Sep 6 17:49:24 2024 +0800
[opt](scanner profile) More counter for scanner (#40144)
New profile metrics to monitor the schedule process of scanner.
```
VScanner:
...
- PeakMemoryUsage: 16.31 MB
- PeakRunningScanner: 1
...
```
In general, the value of `PeakMemoryUsage` is increased when any of the
scan tasks gets a block, decreased when ScanOperator consumes block from
block_queue.
---
be/src/pipeline/exec/scan_operator.cpp | 8 ++
be/src/pipeline/exec/scan_operator.h | 2 +
be/src/vec/exec/scan/scanner_context.cpp | 49 +++++++---
be/src/vec/exec/scan/scanner_context.h | 13 +--
be/src/vec/exec/scan/scanner_scheduler.cpp | 24 ++++-
be/src/vec/exec/scan/vscanner.cpp | 1 +
.../suites/query_profile/scanner_profile.groovy | 104 +++++++++++++++++++++
7 files changed, 177 insertions(+), 24 deletions(-)
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 73cd02b5a5d..0c0cfb18c77 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -997,6 +997,9 @@ Status ScanLocalState<Derived>::_start_scanners(
_scanner_ctx = vectorized::ScannerContext::create_shared(
state(), this, p._output_tuple_desc, p.output_row_descriptor(),
scanners, p.limit(),
state()->scan_queue_mem_limit(), _scan_dependency,
+ // NOTE: This will logic makes _max_thread_num of ScannerContext
to be C(num of cores) * 2
+ // For a query with C/2 instance and M scan node, scan task of
this query will be C/2 * M * C*2
+ // and will be C*C*N at most.
// 1. If data distribution is ignored , we use 1 instance to scan.
// 2. Else if this operator is not file scan operator, we use
config::doris_scanner_thread_pool_thread_num scanners to scan.
// 3. Else, file scanner will consume much memory so we use
config::doris_scanner_thread_pool_thread_num / query_parallel_instance_num
scanners to scan.
@@ -1057,6 +1060,9 @@ Status ScanLocalState<Derived>::_init_profile() {
_memory_usage_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_scanner_profile,
"MemoryUsage", 1);
_free_blocks_memory_usage =
_scanner_profile->AddHighWaterMarkCounter("FreeBlocks",
TUnit::BYTES, "MemoryUsage", 1);
+ _scanner_peak_memory_usage =
+ _scanner_profile->AddHighWaterMarkCounter("PeakMemoryUsage",
TUnit::BYTES);
+
_newly_create_free_blocks_num =
ADD_COUNTER(_scanner_profile, "NewlyCreateFreeBlocksNum",
TUnit::UNIT);
_scale_up_scanners_counter = ADD_COUNTER(_scanner_profile,
"NumScaleUpScanners", TUnit::UNIT);
@@ -1075,6 +1081,8 @@ Status ScanLocalState<Derived>::_init_profile() {
_max_scanner_thread_num = ADD_COUNTER(_runtime_profile,
"MaxScannerThreadNum", TUnit::UNIT);
+ _peak_running_scanner =
+ _scanner_profile->AddHighWaterMarkCounter("PeakRunningScanner",
TUnit::UNIT);
return Status::OK();
}
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index defd5606f74..fed1e4015d8 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -109,6 +109,8 @@ protected:
RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
// Max num of scanner thread
RuntimeProfile::Counter* _max_scanner_thread_num = nullptr;
+ RuntimeProfile::HighWaterMarkCounter* _peak_running_scanner = nullptr;
+ RuntimeProfile::HighWaterMarkCounter* _scanner_peak_memory_usage = nullptr;
// time of get block from scanner
RuntimeProfile::Counter* _scan_timer = nullptr;
RuntimeProfile::Counter* _scan_cpu_timer = nullptr;
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index bab11616c77..5cc20c214c1 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -73,11 +73,23 @@ ScannerContext::ScannerContext(
limit = -1;
}
MAX_SCALE_UP_RATIO = _state->scanner_scale_up_ratio();
+ // _max_thread_num controls how many scanners of this ScanOperator can be
submitted to scheduler at a time.
+ // The overall target of our system is to make full utilization of the
resources.
+ // At the same time, we dont want too many tasks are queued by scheduler,
that makes the query
+ // waiting too long, and existing task can not be scheduled in time.
+ // First of all, we try to make sure _max_thread_num of a ScanNode of a
query on a single backend is less than
+ // config::doris_scanner_thread_pool_thread_num.
+ // For example, on a 64-core machine, the default value of
config::doris_scanner_thread_pool_thread_num will be 64*2 =128.
+ // and the num_parallel_instances of this scan operator will be 64/2=32.
+ // For a query who has two scan nodes, the _max_thread_num of each scan
node instance will be 128 / 32 = 4.
+ // We have 32 instances of this scan operator, so for the ScanNode, we
have 4 * 32 = 128 scanner tasks can be submitted at a time.
+ // Remember that we have to ScanNode in this query, so the total number of
scanner tasks can be submitted at a time is 128 * 2 = 256.
_max_thread_num =
_state->num_scanner_threads() > 0
? _state->num_scanner_threads()
: config::doris_scanner_thread_pool_thread_num /
num_parallel_instances;
_max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
+ // In some situation, there are not too many big tablets involed, so we
can reduce the thread number.
_max_thread_num = std::min(_max_thread_num, (int32_t)scanners.size());
// 1. Calculate max concurrency
// For select * from table limit 10; should just use one thread.
@@ -116,7 +128,6 @@ Status ScannerContext::init() {
_scanner_sched_counter = _local_state->_scanner_sched_counter;
_newly_create_free_blocks_num =
_local_state->_newly_create_free_blocks_num;
_scanner_wait_batch_timer = _local_state->_scanner_wait_batch_timer;
- _free_blocks_memory_usage_mark = _local_state->_free_blocks_memory_usage;
_scanner_ctx_sched_time = _local_state->_scanner_ctx_sched_time;
_scale_up_scanners_counter = _local_state->_scale_up_scanners_counter;
@@ -157,9 +168,11 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool
force) {
vectorized::BlockUPtr block = nullptr;
if (_free_blocks.try_dequeue(block)) {
DCHECK(block->mem_reuse());
- _free_blocks_memory_usage -= block->allocated_bytes();
- _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
- } else if (_free_blocks_memory_usage < _max_bytes_in_queue || force) {
+ _block_memory_usage -= block->allocated_bytes();
+ // A free block is reused, so the memory usage should be decreased
+ // The caller of get_free_block will increase the memory usage
+ update_peak_memory_usage(-block->allocated_bytes());
+ } else if (_block_memory_usage < _max_bytes_in_queue || force) {
_newly_create_free_blocks_num->update(1);
block = vectorized::Block::create_unique(_output_tuple_desc->slots(),
0,
true /*ignore invalid
slots*/);
@@ -168,11 +181,13 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool
force) {
}
void ScannerContext::return_free_block(vectorized::BlockUPtr block) {
- if (block->mem_reuse() && _free_blocks_memory_usage < _max_bytes_in_queue)
{
- _free_blocks_memory_usage += block->allocated_bytes();
- _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
+ if (block->mem_reuse() && _block_memory_usage < _max_bytes_in_queue) {
+ size_t block_size_to_reuse = block->allocated_bytes();
+ _block_memory_usage += block_size_to_reuse;
block->clear_column_data();
- _free_blocks.enqueue(std::move(block));
+ if (_free_blocks.enqueue(std::move(block))) {
+ update_peak_memory_usage(block_size_to_reuse);
+ }
}
}
@@ -242,8 +257,8 @@ Status ScannerContext::get_block_from_queue(RuntimeState*
state, vectorized::Blo
if (_estimated_block_size > block_size) {
_estimated_block_size = block_size;
}
- _free_blocks_memory_usage -= block_size;
- _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
+ _block_memory_usage -= block_size;
+ update_peak_memory_usage(-current_block->allocated_bytes());
// consume current block
block->swap(*current_block);
return_free_block(std::move(current_block));
@@ -263,8 +278,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState*
state, vectorized::Blo
for (int i = 0; i < free_blocks_for_each; ++i) {
vectorized::BlockUPtr removed_block;
if (_free_blocks.try_dequeue(removed_block)) {
- _free_blocks_memory_usage -=
block->allocated_bytes();
-
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
+ _block_memory_usage -= block->allocated_bytes();
}
}
}
@@ -314,8 +328,7 @@ void ScannerContext::_try_to_scale_up() {
int num_add = int(std::min(_num_running_scanners * SCALE_UP_RATIO,
_max_thread_num * MAX_SCALE_UP_RATIO -
_num_running_scanners));
if (_estimated_block_size > 0) {
- int most_add =
- (_max_bytes_in_queue - _free_blocks_memory_usage) /
_estimated_block_size;
+ int most_add = (_max_bytes_in_queue - _block_memory_usage) /
_estimated_block_size;
num_add = std::min(num_add, most_add);
}
for (int i = 0; i < num_add; ++i) {
@@ -445,4 +458,12 @@ void ScannerContext::_set_scanner_done() {
_dependency->set_always_ready();
}
+void ScannerContext::update_peak_running_scanner(int num) {
+ _local_state->_peak_running_scanner->add(num);
+}
+
+void ScannerContext::update_peak_memory_usage(int64_t usage) {
+ _local_state->_scanner_peak_memory_usage->add(usage);
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index d97fc731fe5..f93d01eef88 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -122,10 +122,12 @@ public:
vectorized::BlockUPtr get_free_block(bool force);
void return_free_block(vectorized::BlockUPtr block);
- inline void inc_free_block_usage(size_t usage) {
- _free_blocks_memory_usage += usage;
- _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
- }
+ inline void inc_block_usage(size_t usage) { _block_memory_usage += usage; }
+
+ // Caller should make sure the pipeline task is still running when calling
this function
+ void update_peak_running_scanner(int num);
+ // Caller should make sure the pipeline task is still running when calling
this function
+ void update_peak_memory_usage(int64_t usage);
// Get next block from blocks queue. Called by ScanNode/ScanOperator
// Set eos to true if there is no more data to read.
@@ -223,7 +225,6 @@ protected:
RuntimeProfile::Counter* _scanner_sched_counter = nullptr;
RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr;
- RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage_mark =
nullptr;
RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr;
RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr;
QueryThreadContext _query_thread_context;
@@ -231,7 +232,7 @@ protected:
// for scaling up the running scanners
size_t _estimated_block_size = 0;
- std::atomic_long _free_blocks_memory_usage = 0;
+ std::atomic_long _block_memory_usage = 0;
int64_t _last_scale_up_time = 0;
int64_t _last_fetch_time = 0;
int64_t _total_wait_block_time = 0;
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 351f5d4e275..e30983932ee 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -34,6 +34,7 @@
#include "common/status.h"
#include "olap/tablet.h"
#include "runtime/exec_env.h"
+#include "runtime/memory/mem_tracker.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "util/async_io.h" // IWYU pragma: keep
@@ -210,6 +211,9 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
return;
}
+ ctx->update_peak_running_scanner(1);
+ Defer defer([&] { ctx->update_peak_running_scanner(-1); });
+
std::shared_ptr<ScannerDelegate> scanner_delegate =
scan_task->scanner.lock();
if (scanner_delegate == nullptr) {
return;
@@ -267,13 +271,18 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
if (free_block == nullptr) {
break;
}
+ // We got a new created block or a reused block.
+ ctx->update_peak_memory_usage(free_block->allocated_bytes());
+ ctx->update_peak_memory_usage(-free_block->allocated_bytes());
status = scanner->get_block_after_projects(state,
free_block.get(), &eos);
+ // Projection will truncate useless columns, makes block size
change.
+ auto free_block_bytes = free_block->allocated_bytes();
+ ctx->update_peak_memory_usage(free_block_bytes);
first_read = false;
if (!status.ok()) {
LOG(WARNING) << "Scan thread read VScanner failed: " <<
status.to_string();
break;
}
- auto free_block_bytes = free_block->allocated_bytes();
raw_bytes_read += free_block_bytes;
if (!scan_task->cached_blocks.empty() &&
scan_task->cached_blocks.back().first->rows() +
free_block->rows() <=
@@ -281,18 +290,25 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
size_t block_size =
scan_task->cached_blocks.back().first->allocated_bytes();
vectorized::MutableBlock mutable_block(
scan_task->cached_blocks.back().first.get());
+
ctx->update_peak_memory_usage(-mutable_block.allocated_bytes());
status = mutable_block.merge(*free_block);
+
ctx->update_peak_memory_usage(mutable_block.allocated_bytes());
if (!status.ok()) {
LOG(WARNING) << "Block merge failed: " <<
status.to_string();
break;
}
+ scan_task->cached_blocks.back().second =
mutable_block.allocated_bytes();
scan_task->cached_blocks.back().first.get()->set_columns(
std::move(mutable_block.mutable_columns()));
+
+ // Return block succeed or not, this free_block is not
used by this scan task any more.
+ ctx->update_peak_memory_usage(-free_block_bytes);
+ // If block can be reused, its memory usage will be added
back.
ctx->return_free_block(std::move(free_block));
- ctx->inc_free_block_usage(
-
scan_task->cached_blocks.back().first->allocated_bytes() - block_size);
+
ctx->inc_block_usage(scan_task->cached_blocks.back().first->allocated_bytes() -
+ block_size);
} else {
- ctx->inc_free_block_usage(free_block->allocated_bytes());
+ ctx->inc_block_usage(free_block->allocated_bytes());
scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
}
} // end for while
diff --git a/be/src/vec/exec/scan/vscanner.cpp
b/be/src/vec/exec/scan/vscanner.cpp
index 43d791caffa..a78f8956025 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -143,6 +143,7 @@ Status VScanner::get_block(RuntimeState* state, Block*
block, bool* eof) {
}
if (state->is_cancelled()) {
+ // TODO: Should return the specific ErrorStatus instead of just
Cancelled.
return Status::Cancelled("cancelled");
}
*eof = *eof || _should_stop;
diff --git a/regression-test/suites/query_profile/scanner_profile.groovy
b/regression-test/suites/query_profile/scanner_profile.groovy
new file mode 100644
index 00000000000..38216d211e6
--- /dev/null
+++ b/regression-test/suites/query_profile/scanner_profile.groovy
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import groovy.json.JsonOutput
+import groovy.json.JsonSlurper
+import groovy.json.StringEscapeUtils
+
+
+def getProfileList = {
+ def dst = 'http://' + context.config.feHttpAddress
+ def conn = new URL(dst + "/rest/v1/query_profile").openConnection()
+ conn.setRequestMethod("GET")
+ def encoding =
Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" +
+ (context.config.feHttpPassword == null ? "" :
context.config.feHttpPassword)).getBytes("UTF-8"))
+ conn.setRequestProperty("Authorization", "Basic ${encoding}")
+ return conn.getInputStream().getText()
+}
+
+
+def getProfile = { id ->
+ def dst = 'http://' + context.config.feHttpAddress
+ def conn = new URL(dst +
"/api/profile/text/?query_id=$id").openConnection()
+ conn.setRequestMethod("GET")
+ def encoding =
Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" +
+ (context.config.feHttpPassword == null ? "" :
context.config.feHttpPassword)).getBytes("UTF-8"))
+ conn.setRequestProperty("Authorization", "Basic ${encoding}")
+ return conn.getInputStream().getText()
+}
+
+suite('scanner_profile') {
+ sql """
+ DROP TABLE IF EXISTS scanner_profile;
+ """
+ sql """
+ CREATE TABLE if not exists `scanner_profile` (
+ `id` INT,
+ `name` varchar(32)
+ ) ENGINE=OLAP
+ DISTRIBUTED BY HASH(`id`) BUCKETS 10
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ // Insert data to table
+ sql """
+ insert into scanner_profile values
+ (1, "A"),(2, "B"),(3, "C"),(4,
"D"),(5,"E"),(6,"F"),(7,"G"),(8,"H"),(9,"K");
+ """
+ sql """
+ insert into scanner_profile values
+ (10, "A"),(20, "B"),(30, "C"),(40,
"D"),(50,"E"),(60,"F"),(70,"G"),(80,"H"),(90,"K");
+ """
+ sql """
+ insert into scanner_profile values
+ (101, "A"),(201, "B"),(301, "C"),(401,
"D"),(501,"E"),(601,"F"),(701,"G"),(801,"H"),(901,"K");
+ """
+ sql """
+ insert into scanner_profile values
+ (1010, "A"),(2010, "B"),(3010, "C"),(4010,
"D"),(5010,"E"),(6010,"F"),(7010,"G"),(8010,"H"),(9010,"K");
+ """
+
+ def uuidString = UUID.randomUUID().toString()
+ sql "set enable_profile=true"
+ // With Limit, MaxScannerThreadNum = 1
+ sql """
+ select "with_limit_1_${uuidString}", * from scanner_profile limit 10;
+ """
+
+ def wholeString = getProfileList()
+ List profileData = new JsonSlurper().parseText(wholeString).data.rows
+ String queryIdWithLimit1 = "";
+
+
+ logger.info("{}", uuidString)
+
+ for (def profileItem in profileData) {
+ if (profileItem["Sql
Statement"].toString().contains("with_limit_1_${uuidString}")) {
+ queryIdWithLimit1 = profileItem["Profile ID"].toString()
+ logger.info("profileItem: {}", profileItem)
+ }
+ }
+
+ logger.info("queryIdWithLimit1_${uuidString}: {}", queryIdWithLimit1)
+
+ assertTrue(queryIdWithLimit1 != "")
+ def String profileWithLimit1 = getProfile(queryIdWithLimit1).toString()
+ logger.info("query profile {}", profileWithLimit1)
+ assertTrue(profileWithLimit1.contains("- PeakRunningScanner: 1"))
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]