This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 28a8c7620c0 branch-3.1: [bugfix](be_metrics) update scan bytes metric 
correctly #52232 (#52584)
28a8c7620c0 is described below

commit 28a8c7620c074b97285bbf529b9d6740a07553e3
Author: yiguolei <[email protected]>
AuthorDate: Tue Jul 1 19:44:20 2025 +0800

    branch-3.1: [bugfix](be_metrics) update scan bytes metric correctly #52232 
(#52584)
    
    Cherry-picked from #52232
---
 be/src/olap/rowset/segment_v2/page_io.cpp  |  2 ++
 be/src/util/doris_metrics.cpp              |  4 +++
 be/src/util/doris_metrics.h                |  2 ++
 be/src/vec/exec/scan/new_olap_scanner.cpp  | 42 +++++++++++++++++++++++-------
 be/src/vec/exec/scan/new_olap_scanner.h    |  4 +--
 be/src/vec/exec/scan/scanner_scheduler.cpp |  5 +++-
 be/src/vec/exec/scan/vscanner.cpp          |  7 -----
 be/src/vec/exec/scan/vscanner.h            |  4 +++
 8 files changed, 51 insertions(+), 19 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/page_io.cpp 
b/be/src/olap/rowset/segment_v2/page_io.cpp
index 004ebfbc84a..0e3d891aa07 100644
--- a/be/src/olap/rowset/segment_v2/page_io.cpp
+++ b/be/src/olap/rowset/segment_v2/page_io.cpp
@@ -147,6 +147,8 @@ Status PageIO::read_and_decompress_page_(const 
PageReadOptions& opts, PageHandle
                                       footer_size, 
opts.file_reader->path().native());
         }
         *body = Slice(page_slice.data, page_slice.size - 4 - footer_size);
+        // If read from cache, then should also recorded in uncompressed bytes 
read counter.
+        opts.stats->uncompressed_bytes_read += body->size;
         return Status::OK();
     }
 
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index e7e700bfaf3..7dee4e2a9fe 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -43,6 +43,8 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(fragment_requests_total, 
MetricUnit::REQUES
                                      "Total fragment requests received.");
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(fragment_request_duration_us, 
MetricUnit::MICROSECONDS);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_bytes, MetricUnit::BYTES);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_bytes_from_local, 
MetricUnit::BYTES);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_bytes_from_remote, 
MetricUnit::BYTES);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_rows, MetricUnit::ROWS);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_scan_count, MetricUnit::NOUNIT);
 DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(push_requests_success_total, 
MetricUnit::REQUESTS, "",
@@ -238,6 +240,8 @@ DorisMetrics::DorisMetrics() : 
_metric_registry(_s_registry_name) {
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
fragment_requests_total);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
fragment_request_duration_us);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_scan_bytes);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
query_scan_bytes_from_local);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
query_scan_bytes_from_remote);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_scan_rows);
 
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
push_requests_success_total);
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index d95eee6800e..94e13554eb7 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -50,6 +50,8 @@ public:
     IntCounter* fragment_requests_total = nullptr;
     IntCounter* fragment_request_duration_us = nullptr;
     IntCounter* query_scan_bytes = nullptr;
+    IntCounter* query_scan_bytes_from_local = nullptr;
+    IntCounter* query_scan_bytes_from_remote = nullptr;
     IntCounter* query_scan_rows = nullptr;
 
     IntCounter* push_requests_success_total = nullptr;
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index ba5330d166c..4a5883da55f 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -524,7 +524,6 @@ Status NewOlapScanner::_get_block_impl(RuntimeState* state, 
Block* block, bool*
         _tablet_reader_params.tablet->read_block_count.fetch_add(1, 
std::memory_order_relaxed);
         *eof = false;
     }
-    _update_realtime_counters();
     return Status::OK();
 }
 
@@ -537,17 +536,42 @@ Status NewOlapScanner::close(RuntimeState* state) {
     return Status::OK();
 }
 
-void NewOlapScanner::_update_realtime_counters() {
+void NewOlapScanner::update_realtime_counters() {
     pipeline::OlapScanLocalState* local_state =
             static_cast<pipeline::OlapScanLocalState*>(_local_state);
     const OlapReaderStatistics& stats = _tablet_reader->stats();
     COUNTER_UPDATE(local_state->_read_compressed_counter, 
stats.compressed_bytes_read);
-    COUNTER_UPDATE(local_state->_scan_bytes, stats.compressed_bytes_read);
-    _tablet_reader->mutable_stats()->compressed_bytes_read = 0;
-
+    COUNTER_UPDATE(local_state->_scan_bytes, stats.uncompressed_bytes_read);
     COUNTER_UPDATE(local_state->_scan_rows, stats.raw_rows_read);
-    // if raw_rows_read is reset, scanNode will scan all table rows which may 
cause BE crash
+
+    // Make sure the scan bytes and scan rows counter in audit log is the same 
as the counter in
+    // doris metrics.
+    // ScanBytes is the uncompressed bytes read from local + remote
+    // bytes_read_from_local is the compressed bytes read from local
+    // bytes_read_from_remote is the compressed bytes read from remote
+    // scan bytes > bytes_read_from_local + bytes_read_from_remote
+    if (_query_statistics) {
+        _query_statistics->add_scan_rows(stats.raw_rows_read);
+        _query_statistics->add_scan_bytes(stats.uncompressed_bytes_read);
+    }
+
+    // In case of no cache, we still need to update the IO stats. uncompressed 
bytes read == local + remote
+    if (stats.file_cache_stats.bytes_read_from_local == 0 &&
+        stats.file_cache_stats.bytes_read_from_remote == 0) {
+        DorisMetrics::instance()->query_scan_bytes_from_local->increment(
+                stats.compressed_bytes_read);
+    } else {
+        DorisMetrics::instance()->query_scan_bytes_from_local->increment(
+                stats.file_cache_stats.bytes_read_from_local);
+        DorisMetrics::instance()->query_scan_bytes_from_remote->increment(
+                stats.file_cache_stats.bytes_read_from_remote);
+    }
+
+    _tablet_reader->mutable_stats()->compressed_bytes_read = 0;
+    _tablet_reader->mutable_stats()->uncompressed_bytes_read = 0;
     _tablet_reader->mutable_stats()->raw_rows_read = 0;
+    _tablet_reader->mutable_stats()->file_cache_stats.bytes_read_from_local = 
0;
+    _tablet_reader->mutable_stats()->file_cache_stats.bytes_read_from_remote = 
0;
 }
 
 void NewOlapScanner::_collect_profile_before_close() {
@@ -563,7 +587,7 @@ void NewOlapScanner::_collect_profile_before_close() {
 #define INCR_COUNTER(Parent)                                                   
                 \
     COUNTER_UPDATE(Parent->_io_timer, stats.io_ns);                            
                 \
     COUNTER_UPDATE(Parent->_read_compressed_counter, 
stats.compressed_bytes_read);              \
-    COUNTER_UPDATE(Parent->_scan_bytes, stats.compressed_bytes_read);          
                 \
+    COUNTER_UPDATE(Parent->_scan_bytes, stats.uncompressed_bytes_read);        
                 \
     COUNTER_UPDATE(Parent->_decompressor_timer, stats.decompress_ns);          
                 \
     COUNTER_UPDATE(Parent->_read_uncompressed_counter, 
stats.uncompressed_bytes_read);          \
     COUNTER_UPDATE(Parent->_block_load_timer, stats.block_load_ns);            
                 \
@@ -710,10 +734,10 @@ void NewOlapScanner::_collect_profile_before_close() {
 
     // Update metrics
     DorisMetrics::instance()->query_scan_bytes->increment(
-            local_state->_read_compressed_counter->value());
+            local_state->_read_uncompressed_counter->value());
     
DorisMetrics::instance()->query_scan_rows->increment(local_state->_scan_rows->value());
     auto& tablet = _tablet_reader_params.tablet;
-    
tablet->query_scan_bytes->increment(local_state->_read_compressed_counter->value());
+    
tablet->query_scan_bytes->increment(local_state->_read_uncompressed_counter->value());
     tablet->query_scan_rows->increment(local_state->_scan_rows->value());
     tablet->query_scan_count->increment(1);
     if (_query_statistics) {
diff --git a/be/src/vec/exec/scan/new_olap_scanner.h 
b/be/src/vec/exec/scan/new_olap_scanner.h
index fd1246b120b..73e867ac4ce 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.h
+++ b/be/src/vec/exec/scan/new_olap_scanner.h
@@ -77,13 +77,13 @@ public:
 
     doris::TabletStorageType get_storage_type() override;
 
+    void update_realtime_counters() override;
+
 protected:
     Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
     void _collect_profile_before_close() override;
 
 private:
-    void _update_realtime_counters();
-
     Status _init_tablet_reader_params(const std::vector<OlapScanRange*>& 
key_ranges,
                                       const std::vector<TCondition>& filters,
                                       const pipeline::FilterPredicates& 
filter_predicates,
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index b518056c897..aed708442ae 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -313,8 +313,11 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
         scan_task->set_status(status);
         eos = true;
     }
-
+    // WorkloadGroup Policy will check cputime realtime, so that should update 
the counter
+    // as soon as possible, could not update it on close.
     scanner->update_scan_cpu_timer();
+    scanner->update_realtime_counters();
+
     if (eos) {
         scanner->mark_to_need_to_close();
     }
diff --git a/be/src/vec/exec/scan/vscanner.cpp 
b/be/src/vec/exec/scan/vscanner.cpp
index 0087a19d92f..96a384177de 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -103,8 +103,6 @@ Status VScanner::get_block(RuntimeState* state, Block* 
block, bool* eof) {
         }
     }
 
-    int64_t old_scan_rows = _num_rows_read;
-    int64_t old_scan_bytes = _num_byte_read;
     {
         do {
             // if step 2 filter all rows of block, and block will be reused to 
get next rows,
@@ -136,11 +134,6 @@ Status VScanner::get_block(RuntimeState* state, Block* 
block, bool* eof) {
                  _num_rows_read < rows_read_threshold);
     }
 
-    if (_query_statistics) {
-        _query_statistics->add_scan_rows(_num_rows_read - old_scan_rows);
-        _query_statistics->add_scan_bytes(_num_byte_read - old_scan_bytes);
-    }
-
     if (state->is_cancelled()) {
         // TODO: Should return the specific ErrorStatus instead of just 
Cancelled.
         return Status::Cancelled("cancelled");
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index bb68055e1f0..ac6371c3abc 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -128,6 +128,10 @@ public:
 
     void update_scan_cpu_timer();
 
+    // Some counters need to be updated realtime, for example, workload group 
policy need
+    // scan bytes to cancel the query exceed limit.
+    virtual void update_realtime_counters() {}
+
     RuntimeState* runtime_state() { return _state; }
 
     bool is_open() { return _is_open; }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to