This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 28a8c7620c0 branch-3.1: [bugfix](be_metrics) update scan bytes metric
correctly #52232 (#52584)
28a8c7620c0 is described below
commit 28a8c7620c074b97285bbf529b9d6740a07553e3
Author: yiguolei <[email protected]>
AuthorDate: Tue Jul 1 19:44:20 2025 +0800
branch-3.1: [bugfix](be_metrics) update scan bytes metric correctly #52232
(#52584)
Cherry-picked from #52232
---
be/src/olap/rowset/segment_v2/page_io.cpp | 2 ++
be/src/util/doris_metrics.cpp | 4 +++
be/src/util/doris_metrics.h | 2 ++
be/src/vec/exec/scan/new_olap_scanner.cpp | 42 +++++++++++++++++++++++-------
be/src/vec/exec/scan/new_olap_scanner.h | 4 +--
be/src/vec/exec/scan/scanner_scheduler.cpp | 5 +++-
be/src/vec/exec/scan/vscanner.cpp | 7 -----
be/src/vec/exec/scan/vscanner.h | 4 +++
8 files changed, 51 insertions(+), 19 deletions(-)
diff --git a/be/src/olap/rowset/segment_v2/page_io.cpp
b/be/src/olap/rowset/segment_v2/page_io.cpp
index 004ebfbc84a..0e3d891aa07 100644
--- a/be/src/olap/rowset/segment_v2/page_io.cpp
+++ b/be/src/olap/rowset/segment_v2/page_io.cpp
@@ -147,6 +147,8 @@ Status PageIO::read_and_decompress_page_(const
PageReadOptions& opts, PageHandle
footer_size,
opts.file_reader->path().native());
}
*body = Slice(page_slice.data, page_slice.size - 4 - footer_size);
+ // If read from cache, then should also recorded in uncompressed bytes
read counter.
+ opts.stats->uncompressed_bytes_read += body->size;
return Status::OK();
}
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index e7e700bfaf3..7dee4e2a9fe 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -43,6 +43,8 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(fragment_requests_total,
MetricUnit::REQUES
"Total fragment requests received.");
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(fragment_request_duration_us,
MetricUnit::MICROSECONDS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_bytes, MetricUnit::BYTES);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_bytes_from_local,
MetricUnit::BYTES);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_bytes_from_remote,
MetricUnit::BYTES);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_rows, MetricUnit::ROWS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_count, MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(push_requests_success_total,
MetricUnit::REQUESTS, "",
@@ -238,6 +240,8 @@ DorisMetrics::DorisMetrics() :
_metric_registry(_s_registry_name) {
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
fragment_requests_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
fragment_request_duration_us);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_scan_bytes);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
query_scan_bytes_from_local);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
query_scan_bytes_from_remote);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_scan_rows);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
push_requests_success_total);
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index d95eee6800e..94e13554eb7 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -50,6 +50,8 @@ public:
IntCounter* fragment_requests_total = nullptr;
IntCounter* fragment_request_duration_us = nullptr;
IntCounter* query_scan_bytes = nullptr;
+ IntCounter* query_scan_bytes_from_local = nullptr;
+ IntCounter* query_scan_bytes_from_remote = nullptr;
IntCounter* query_scan_rows = nullptr;
IntCounter* push_requests_success_total = nullptr;
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index ba5330d166c..4a5883da55f 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -524,7 +524,6 @@ Status NewOlapScanner::_get_block_impl(RuntimeState* state,
Block* block, bool*
_tablet_reader_params.tablet->read_block_count.fetch_add(1,
std::memory_order_relaxed);
*eof = false;
}
- _update_realtime_counters();
return Status::OK();
}
@@ -537,17 +536,42 @@ Status NewOlapScanner::close(RuntimeState* state) {
return Status::OK();
}
-void NewOlapScanner::_update_realtime_counters() {
+void NewOlapScanner::update_realtime_counters() {
pipeline::OlapScanLocalState* local_state =
static_cast<pipeline::OlapScanLocalState*>(_local_state);
const OlapReaderStatistics& stats = _tablet_reader->stats();
COUNTER_UPDATE(local_state->_read_compressed_counter,
stats.compressed_bytes_read);
- COUNTER_UPDATE(local_state->_scan_bytes, stats.compressed_bytes_read);
- _tablet_reader->mutable_stats()->compressed_bytes_read = 0;
-
+ COUNTER_UPDATE(local_state->_scan_bytes, stats.uncompressed_bytes_read);
COUNTER_UPDATE(local_state->_scan_rows, stats.raw_rows_read);
- // if raw_rows_read is reset, scanNode will scan all table rows which may
cause BE crash
+
+ // Make sure the scan bytes and scan rows counter in audit log is the same
as the counter in
+ // doris metrics.
+ // ScanBytes is the uncompressed bytes read from local + remote
+ // bytes_read_from_local is the compressed bytes read from local
+ // bytes_read_from_remote is the compressed bytes read from remote
+ // scan bytes > bytes_read_from_local + bytes_read_from_remote
+ if (_query_statistics) {
+ _query_statistics->add_scan_rows(stats.raw_rows_read);
+ _query_statistics->add_scan_bytes(stats.uncompressed_bytes_read);
+ }
+
+ // In case of no cache, we still need to update the IO stats. uncompressed
bytes read == local + remote
+ if (stats.file_cache_stats.bytes_read_from_local == 0 &&
+ stats.file_cache_stats.bytes_read_from_remote == 0) {
+ DorisMetrics::instance()->query_scan_bytes_from_local->increment(
+ stats.compressed_bytes_read);
+ } else {
+ DorisMetrics::instance()->query_scan_bytes_from_local->increment(
+ stats.file_cache_stats.bytes_read_from_local);
+ DorisMetrics::instance()->query_scan_bytes_from_remote->increment(
+ stats.file_cache_stats.bytes_read_from_remote);
+ }
+
+ _tablet_reader->mutable_stats()->compressed_bytes_read = 0;
+ _tablet_reader->mutable_stats()->uncompressed_bytes_read = 0;
_tablet_reader->mutable_stats()->raw_rows_read = 0;
+ _tablet_reader->mutable_stats()->file_cache_stats.bytes_read_from_local =
0;
+ _tablet_reader->mutable_stats()->file_cache_stats.bytes_read_from_remote =
0;
}
void NewOlapScanner::_collect_profile_before_close() {
@@ -563,7 +587,7 @@ void NewOlapScanner::_collect_profile_before_close() {
#define INCR_COUNTER(Parent)
\
COUNTER_UPDATE(Parent->_io_timer, stats.io_ns);
\
COUNTER_UPDATE(Parent->_read_compressed_counter,
stats.compressed_bytes_read); \
- COUNTER_UPDATE(Parent->_scan_bytes, stats.compressed_bytes_read);
\
+ COUNTER_UPDATE(Parent->_scan_bytes, stats.uncompressed_bytes_read);
\
COUNTER_UPDATE(Parent->_decompressor_timer, stats.decompress_ns);
\
COUNTER_UPDATE(Parent->_read_uncompressed_counter,
stats.uncompressed_bytes_read); \
COUNTER_UPDATE(Parent->_block_load_timer, stats.block_load_ns);
\
@@ -710,10 +734,10 @@ void NewOlapScanner::_collect_profile_before_close() {
// Update metrics
DorisMetrics::instance()->query_scan_bytes->increment(
- local_state->_read_compressed_counter->value());
+ local_state->_read_uncompressed_counter->value());
DorisMetrics::instance()->query_scan_rows->increment(local_state->_scan_rows->value());
auto& tablet = _tablet_reader_params.tablet;
-
tablet->query_scan_bytes->increment(local_state->_read_compressed_counter->value());
+
tablet->query_scan_bytes->increment(local_state->_read_uncompressed_counter->value());
tablet->query_scan_rows->increment(local_state->_scan_rows->value());
tablet->query_scan_count->increment(1);
if (_query_statistics) {
diff --git a/be/src/vec/exec/scan/new_olap_scanner.h
b/be/src/vec/exec/scan/new_olap_scanner.h
index fd1246b120b..73e867ac4ce 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.h
+++ b/be/src/vec/exec/scan/new_olap_scanner.h
@@ -77,13 +77,13 @@ public:
doris::TabletStorageType get_storage_type() override;
+ void update_realtime_counters() override;
+
protected:
Status _get_block_impl(RuntimeState* state, Block* block, bool* eos)
override;
void _collect_profile_before_close() override;
private:
- void _update_realtime_counters();
-
Status _init_tablet_reader_params(const std::vector<OlapScanRange*>&
key_ranges,
const std::vector<TCondition>& filters,
const pipeline::FilterPredicates&
filter_predicates,
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index b518056c897..aed708442ae 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -313,8 +313,11 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
scan_task->set_status(status);
eos = true;
}
-
+ // WorkloadGroup Policy will check cputime realtime, so that should update
the counter
+ // as soon as possible, could not update it on close.
scanner->update_scan_cpu_timer();
+ scanner->update_realtime_counters();
+
if (eos) {
scanner->mark_to_need_to_close();
}
diff --git a/be/src/vec/exec/scan/vscanner.cpp
b/be/src/vec/exec/scan/vscanner.cpp
index 0087a19d92f..96a384177de 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -103,8 +103,6 @@ Status VScanner::get_block(RuntimeState* state, Block*
block, bool* eof) {
}
}
- int64_t old_scan_rows = _num_rows_read;
- int64_t old_scan_bytes = _num_byte_read;
{
do {
// if step 2 filter all rows of block, and block will be reused to
get next rows,
@@ -136,11 +134,6 @@ Status VScanner::get_block(RuntimeState* state, Block*
block, bool* eof) {
_num_rows_read < rows_read_threshold);
}
- if (_query_statistics) {
- _query_statistics->add_scan_rows(_num_rows_read - old_scan_rows);
- _query_statistics->add_scan_bytes(_num_byte_read - old_scan_bytes);
- }
-
if (state->is_cancelled()) {
// TODO: Should return the specific ErrorStatus instead of just
Cancelled.
return Status::Cancelled("cancelled");
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index bb68055e1f0..ac6371c3abc 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -128,6 +128,10 @@ public:
void update_scan_cpu_timer();
+ // Some counters need to be updated realtime, for example, workload group
policy need
+ // scan bytes to cancel the query exceed limit.
+ virtual void update_realtime_counters() {}
+
RuntimeState* runtime_state() { return _state; }
bool is_open() { return _is_open; }
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]