This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 03e84346b05 branch-3.1: [enhancement](be_metrics) update scan bytes 
metric in file_scanner. #53729 (#54222)
03e84346b05 is described below

commit 03e84346b0507b072888a2bc21ca389679f47317
Author: Qi Chen <[email protected]>
AuthorDate: Mon Aug 4 10:01:42 2025 +0800

    branch-3.1: [enhancement](be_metrics) update scan bytes metric in 
file_scanner. #53729 (#54222)
    
    Cherry-pick #53729
---
 be/src/apache-orc                                  |  2 +-
 be/src/io/fs/tracing_file_reader.h                 | 58 ++++++++++++++++++
 be/src/io/io_common.h                              |  8 +++
 be/src/pipeline/exec/file_scan_operator.h          |  1 +
 be/src/service/internal_service.cpp                |  2 +
 .../vec/exec/format/arrow/arrow_stream_reader.cpp  | 15 ++++-
 be/src/vec/exec/format/arrow/arrow_stream_reader.h |  1 +
 be/src/vec/exec/format/csv/csv_reader.cpp          |  6 +-
 .../file_reader/new_plain_text_line_reader.cpp     | 20 ++----
 .../file_reader/new_plain_text_line_reader.h       |  2 -
 be/src/vec/exec/format/generic_reader.h            |  5 ++
 be/src/vec/exec/format/json/new_json_reader.cpp    | 15 ++---
 be/src/vec/exec/format/json/new_json_reader.h      |  2 -
 be/src/vec/exec/format/orc/vorc_reader.cpp         | 40 ++++++------
 be/src/vec/exec/format/orc/vorc_reader.h           | 37 ++++++-----
 be/src/vec/exec/format/parquet/vparquet_reader.cpp | 51 ++++++++++------
 be/src/vec/exec/format/parquet/vparquet_reader.h   | 14 +++--
 be/src/vec/exec/format/table/table_format_reader.h |  2 +
 be/src/vec/exec/scan/new_olap_scanner.cpp          |  9 +++
 be/src/vec/exec/scan/vfile_scanner.cpp             | 71 +++++++++++++++++++++-
 be/src/vec/exec/scan/vfile_scanner.h               |  6 ++
 21 files changed, 276 insertions(+), 91 deletions(-)

diff --git a/be/src/apache-orc b/be/src/apache-orc
index ef68c6ff736..70b673c7d29 160000
--- a/be/src/apache-orc
+++ b/be/src/apache-orc
@@ -1 +1 @@
-Subproject commit ef68c6ff736a84c8c7185d4a08397c67eff53ad6
+Subproject commit 70b673c7d299690ced7c4c600fabaee9e1601198
diff --git a/be/src/io/fs/tracing_file_reader.h 
b/be/src/io/fs/tracing_file_reader.h
new file mode 100644
index 00000000000..84eb3dfc8fb
--- /dev/null
+++ b/be/src/io/fs/tracing_file_reader.h
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include "common/status.h"
+#include "io/fs/file_reader.h"
+#include "util/runtime_profile.h"
+
+namespace doris {
+
+namespace io {
+
+class TracingFileReader : public FileReader {
+public:
+    TracingFileReader(doris::io::FileReaderSPtr inner, FileReaderStats* stats)
+            : _inner(std::move(inner)), _stats(stats) {}
+
+    Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+                        const IOContext* io_ctx) override {
+        SCOPED_RAW_TIMER(&_stats->read_time_ns);
+        Status st = _inner->read_at(offset, result, bytes_read, io_ctx);
+        _stats->read_calls++;
+        _stats->read_bytes += *bytes_read;
+        return st;
+    }
+
+    Status close() override { return _inner->close(); }
+    const doris::io::Path& path() const override { return _inner->path(); }
+    size_t size() const override { return _inner->size(); }
+    bool closed() const override { return _inner->closed(); }
+    const std::string& get_data_dir_path() override { return 
_inner->get_data_dir_path(); }
+
+    void _collect_profile_at_runtime() override { return 
_inner->collect_profile_at_runtime(); }
+    void _collect_profile_before_close() override { return 
_inner->collect_profile_before_close(); }
+
+    FileReaderStats* stats() const { return _stats; }
+
+private:
+    doris::io::FileReaderSPtr _inner;
+    FileReaderStats* _stats;
+};
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/io_common.h b/be/src/io/io_common.h
index 909941181d3..6934aa6a75a 100644
--- a/be/src/io/io_common.h
+++ b/be/src/io/io_common.h
@@ -35,6 +35,13 @@ enum class ReaderType : uint8_t {
 
 namespace io {
 
+struct FileReaderStats {
+    size_t read_calls = 0;
+    size_t read_bytes = 0;
+    int64_t read_time_ns = 0;
+    size_t read_rows = 0;
+};
+
 struct FileCacheStatistics {
     int64_t num_local_io_total = 0;
     int64_t num_remote_io_total = 0;
@@ -73,6 +80,7 @@ struct IOContext {
     int64_t expiration_time = 0;
     const TUniqueId* query_id = nullptr;             // Ref
     FileCacheStatistics* file_cache_stats = nullptr; // Ref
+    FileReaderStats* file_reader_stats = nullptr;    // Ref
     bool is_inverted_index = false;
     // if is_dryrun, read IO will download data to cache but return no data to 
reader
     // useful to skip cache data read from local disk to accelarate warm up
diff --git a/be/src/pipeline/exec/file_scan_operator.h 
b/be/src/pipeline/exec/file_scan_operator.h
index 25635dcdd62..f4a89d4bdec 100644
--- a/be/src/pipeline/exec/file_scan_operator.h
+++ b/be/src/pipeline/exec/file_scan_operator.h
@@ -55,6 +55,7 @@ public:
     std::string name_suffix() const override;
 
 private:
+    friend class vectorized::VFileScanner;
     std::shared_ptr<vectorized::SplitSourceConnector> _split_source = nullptr;
     int _max_scanners;
     // A in memory cache to save some common components
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index d33a8240a95..a1c5c98ce45 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -825,6 +825,8 @@ void 
PInternalService::fetch_table_schema(google::protobuf::RpcController* contr
         io::IOContext io_ctx;
         io::FileCacheStatistics file_cache_statis;
         io_ctx.file_cache_stats = &file_cache_statis;
+        io::FileReaderStats file_reader_stats;
+        io_ctx.file_reader_stats = &file_reader_stats;
         // file_slots is no use, but the lifetime should be longer than reader
         std::vector<SlotDescriptor*> file_slots;
         switch (params.format_type) {
diff --git a/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp 
b/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp
index efe8e36bf20..f5888c88ce2 100644
--- a/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp
+++ b/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp
@@ -25,6 +25,7 @@
 #include "arrow_pip_input_stream.h"
 #include "common/logging.h"
 #include "io/fs/stream_load_pipe.h"
+#include "io/fs/tracing_file_reader.h"
 #include "runtime/descriptors.h"
 #include "runtime/runtime_state.h"
 #include "vec/core/block.h"
@@ -43,14 +44,22 @@ ArrowStreamReader::ArrowStreamReader(RuntimeState* state, 
RuntimeProfile* profil
                                      const TFileRangeDesc& range,
                                      const std::vector<SlotDescriptor*>& 
file_slot_descs,
                                      io::IOContext* io_ctx)
-        : _state(state), _range(range), _file_slot_descs(file_slot_descs), 
_file_reader(nullptr) {
+        : _state(state),
+          _range(range),
+          _file_slot_descs(file_slot_descs),
+          _io_ctx(io_ctx),
+          _file_reader(nullptr) {
     TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, 
_ctzz);
 }
 
 ArrowStreamReader::~ArrowStreamReader() = default;
 
 Status ArrowStreamReader::init_reader() {
-    RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, 
&_file_reader, _state, false));
+    io::FileReaderSPtr file_reader;
+    RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, 
&file_reader, _state, false));
+    _file_reader = _io_ctx ? 
std::make_shared<io::TracingFileReader>(std::move(file_reader),
+                                                                     
_io_ctx->file_reader_stats)
+                           : file_reader;
     _pip_stream = ArrowPipInputStream::create_unique(_file_reader);
     return Status::OK();
 }
@@ -121,4 +130,4 @@ Status 
ArrowStreamReader::get_columns(std::unordered_map<std::string, TypeDescri
 }
 
 #include "common/compile_check_end.h"
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/arrow/arrow_stream_reader.h 
b/be/src/vec/exec/format/arrow/arrow_stream_reader.h
index 830753326b9..c679361e243 100644
--- a/be/src/vec/exec/format/arrow/arrow_stream_reader.h
+++ b/be/src/vec/exec/format/arrow/arrow_stream_reader.h
@@ -65,6 +65,7 @@ private:
     RuntimeState* _state;
     const TFileRangeDesc& _range;
     const std::vector<SlotDescriptor*>& _file_slot_descs;
+    io::IOContext* _io_ctx;
     io::FileReaderSPtr _file_reader;
     std::unique_ptr<doris::vectorized::ArrowPipInputStream> _pip_stream;
     cctz::time_zone _ctzz;
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp 
b/be/src/vec/exec/format/csv/csv_reader.cpp
index a853137eeb0..8eef40f7875 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -39,6 +39,7 @@
 #include "io/fs/buffered_reader.h"
 #include "io/fs/file_reader.h"
 #include "io/fs/s3_file_reader.h"
+#include "io/fs/tracing_file_reader.h"
 #include "runtime/descriptors.h"
 #include "runtime/runtime_state.h"
 #include "util/string_util.h"
@@ -541,10 +542,13 @@ Status CsvReader::_create_file_reader(bool need_schema) {
         _file_description.mtime = _range.__isset.modification_time ? 
_range.modification_time : 0;
         io::FileReaderOptions reader_options =
                 FileFactory::get_reader_options(_state, _file_description);
-        _file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
+        auto file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
                 _profile, _system_properties, _file_description, 
reader_options,
                 io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
                 io::PrefetchRange(_range.start_offset, _range.start_offset + 
_range.size)));
+        _file_reader = _io_ctx ? 
std::make_shared<io::TracingFileReader>(std::move(file_reader),
+                                                                         
_io_ctx->file_reader_stats)
+                               : file_reader;
     }
     if (_file_reader->size() == 0 && _params.file_type != 
TFileType::FILE_STREAM &&
         _params.file_type != TFileType::FILE_BROKER) {
diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp 
b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
index e66c622a1e9..a068a748b11 100644
--- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
+++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
@@ -221,12 +221,8 @@ 
NewPlainTextLineReader::NewPlainTextLineReader(RuntimeProfile* profile,
           _more_input_bytes(0),
           _more_output_bytes(0),
           _current_offset(current_offset),
-          _bytes_read_counter(nullptr),
-          _read_timer(nullptr),
           _bytes_decompress_counter(nullptr),
           _decompress_timer(nullptr) {
-    _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES);
-    _read_timer = ADD_TIMER(_profile, "FileReadTime");
     _bytes_decompress_counter = ADD_COUNTER(_profile, "BytesDecompressed", 
TUnit::BYTES);
     _decompress_timer = ADD_TIMER(_profile, "DecompressTime");
 }
@@ -384,16 +380,12 @@ Status NewPlainTextLineReader::read_line(const uint8_t** 
ptr, size_t* size, bool
                     }
                 }
 
-                {
-                    SCOPED_TIMER(_read_timer);
-                    Slice file_slice(file_buf, buffer_len);
-                    RETURN_IF_ERROR(
-                            _file_reader->read_at(_current_offset, file_slice, 
&read_len, io_ctx));
-                    _current_offset += read_len;
-                    if (read_len == 0) {
-                        _file_eof = true;
-                    }
-                    COUNTER_UPDATE(_bytes_read_counter, read_len);
+                Slice file_slice(file_buf, buffer_len);
+                RETURN_IF_ERROR(
+                        _file_reader->read_at(_current_offset, file_slice, 
&read_len, io_ctx));
+                _current_offset += read_len;
+                if (read_len == 0) {
+                    _file_eof = true;
                 }
                 if (_file_eof || read_len == 0) {
                     if (!stream_end) {
diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h 
b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
index 3e8983214c0..730dc2e9cd9 100644
--- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
+++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
@@ -332,8 +332,6 @@ private:
     size_t _current_offset;
 
     // Profile counters
-    RuntimeProfile::Counter* _bytes_read_counter = nullptr;
-    RuntimeProfile::Counter* _read_timer = nullptr;
     RuntimeProfile::Counter* _bytes_decompress_counter = nullptr;
     RuntimeProfile::Counter* _decompress_timer = nullptr;
 };
diff --git a/be/src/vec/exec/format/generic_reader.h 
b/be/src/vec/exec/format/generic_reader.h
index 2dfe906bf77..c3efc321e2f 100644
--- a/be/src/vec/exec/format/generic_reader.h
+++ b/be/src/vec/exec/format/generic_reader.h
@@ -74,6 +74,11 @@ public:
 
     virtual Status close() { return Status::OK(); }
 
+    /// The reader is responsible for counting the number of rows read,
+    /// because some readers, such as parquet/orc,
+    /// can skip some pages/rowgroups through indexes.
+    virtual bool count_read_rows() { return false; }
+
 protected:
     const size_t _MIN_BATCH_SIZE = 4064; // 4094 - 32(padding)
 
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp 
b/be/src/vec/exec/format/json/new_json_reader.cpp
index d9956e99333..fae64caa119 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -46,6 +46,7 @@
 #include "io/fs/buffered_reader.h"
 #include "io/fs/file_reader.h"
 #include "io/fs/stream_load_pipe.h"
+#include "io/fs/tracing_file_reader.h"
 #include "runtime/define_primitive_type.h"
 #include "runtime/descriptors.h"
 #include "runtime/runtime_state.h"
@@ -95,9 +96,7 @@ NewJsonReader::NewJsonReader(RuntimeState* state, 
RuntimeProfile* profile, Scann
           _scanner_eof(scanner_eof),
           _current_offset(0),
           _io_ctx(io_ctx) {
-    _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES);
     _read_timer = ADD_TIMER(_profile, "ReadTime");
-    _file_read_timer = ADD_TIMER(_profile, "FileReadTime");
     if (_range.__isset.compress_type) {
         // for compatibility
         _file_compress_type = _range.compress_type;
@@ -438,10 +437,13 @@ Status NewJsonReader::_open_file_reader(bool need_schema) 
{
         _file_description.mtime = _range.__isset.modification_time ? 
_range.modification_time : 0;
         io::FileReaderOptions reader_options =
                 FileFactory::get_reader_options(_state, _file_description);
-        _file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
+        auto file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
                 _profile, _system_properties, _file_description, 
reader_options,
                 io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
                 io::PrefetchRange(_range.start_offset, _range.size)));
+        _file_reader = _io_ctx ? 
std::make_shared<io::TracingFileReader>(std::move(file_reader),
+                                                                         
_io_ctx->file_reader_stats)
+                               : file_reader;
     }
     return Status::OK();
 }
@@ -662,7 +664,7 @@ Status NewJsonReader::_parse_json(bool* is_empty_row, bool* 
eof) {
 // return Status::OK() if parse succeed or reach EOF.
 Status NewJsonReader::_parse_json_doc(size_t* size, bool* eof) {
     // read a whole message
-    SCOPED_TIMER(_file_read_timer);
+    SCOPED_TIMER(_read_timer);
     const uint8_t* json_str = nullptr;
     std::unique_ptr<uint8_t[]> json_str_ptr;
     if (_line_reader != nullptr) {
@@ -675,7 +677,6 @@ Status NewJsonReader::_parse_json_doc(size_t* size, bool* 
eof) {
         }
     }
 
-    _bytes_read_counter += *size;
     if (*eof) {
         return Status::OK();
     }
@@ -1931,7 +1932,7 @@ Status 
NewJsonReader::_append_error_msg(simdjson::ondemand::object* obj, std::st
 
 Status NewJsonReader::_simdjson_parse_json(size_t* size, bool* is_empty_row, 
bool* eof,
                                            simdjson::error_code* error) {
-    SCOPED_TIMER(_file_read_timer);
+    SCOPED_TIMER(_read_timer);
     // step1: read buf from pipe.
     if (_line_reader != nullptr) {
         RETURN_IF_ERROR(_line_reader->read_line(&_json_str, size, eof, 
_io_ctx));
@@ -1992,7 +1993,7 @@ Status NewJsonReader::_judge_empty_row(size_t size, bool 
eof, bool* is_empty_row
 
 Status NewJsonReader::_get_json_value(size_t* size, bool* eof, 
simdjson::error_code* error,
                                       bool* is_empty_row) {
-    SCOPED_TIMER(_file_read_timer);
+    SCOPED_TIMER(_read_timer);
     auto return_quality_error = [&](fmt::memory_buffer& error_msg,
                                     const std::string& doc_info) -> Status {
         _counter->num_rows_filtered++;
diff --git a/be/src/vec/exec/format/json/new_json_reader.h 
b/be/src/vec/exec/format/json/new_json_reader.h
index 7805286bc84..9f0a39a98b7 100644
--- a/be/src/vec/exec/format/json/new_json_reader.h
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -261,9 +261,7 @@ private:
 
     io::IOContext* _io_ctx = nullptr;
 
-    RuntimeProfile::Counter* _bytes_read_counter = nullptr;
     RuntimeProfile::Counter* _read_timer = nullptr;
-    RuntimeProfile::Counter* _file_read_timer = nullptr;
 
     // ======SIMD JSON======
     // name mapping
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp 
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index a35155199eb..b8952c7a695 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -113,9 +113,6 @@ static constexpr int decimal_scale_for_hive11 = 10;
     M(TypeIndex::Float64, Float64, orc::DoubleVectorBatch)
 
 void ORCFileInputStream::read(void* buf, uint64_t length, uint64_t offset) {
-    _statistics->fs_read_calls++;
-    _statistics->fs_read_bytes += length;
-    SCOPED_RAW_TIMER(&_statistics->fs_read_time);
     uint64_t has_read = 0;
     char* out = reinterpret_cast<char*>(buf);
     while (has_read < length) {
@@ -124,7 +121,7 @@ void ORCFileInputStream::read(void* buf, uint64_t length, 
uint64_t offset) {
         }
         size_t loop_read;
         Slice result(out + has_read, length - has_read);
-        Status st = _file_reader->read_at(offset + has_read, result, 
&loop_read, _io_ctx);
+        Status st = _tracing_file_reader->read_at(offset + has_read, result, 
&loop_read, _io_ctx);
         if (!st.ok()) {
             throw orc::ParseError(
                     strings::Substitute("Failed to read $0: $1", _file_name, 
st.to_string()));
@@ -141,9 +138,6 @@ void ORCFileInputStream::read(void* buf, uint64_t length, 
uint64_t offset) {
 }
 
 void StripeStreamInputStream::read(void* buf, uint64_t length, uint64_t 
offset) {
-    _statistics->fs_read_calls++;
-    _statistics->fs_read_bytes += length;
-    SCOPED_RAW_TIMER(&_statistics->fs_read_time);
     uint64_t has_read = 0;
     char* out = reinterpret_cast<char*>(buf);
     while (has_read < length) {
@@ -217,9 +211,6 @@ OrcReader::~OrcReader() {
 
 void OrcReader::_collect_profile_before_close() {
     if (_profile != nullptr) {
-        COUNTER_UPDATE(_orc_profile.read_time, _statistics.fs_read_time);
-        COUNTER_UPDATE(_orc_profile.read_calls, _statistics.fs_read_calls);
-        COUNTER_UPDATE(_orc_profile.read_bytes, _statistics.fs_read_bytes);
         COUNTER_UPDATE(_orc_profile.column_read_time, 
_statistics.column_read_time);
         COUNTER_UPDATE(_orc_profile.get_batch_time, 
_statistics.get_batch_time);
         COUNTER_UPDATE(_orc_profile.create_reader_time, 
_statistics.create_reader_time);
@@ -245,10 +236,6 @@ void OrcReader::_init_profile() {
     if (_profile != nullptr) {
         static const char* orc_profile = "OrcReader";
         ADD_TIMER_WITH_LEVEL(_profile, orc_profile, 1);
-        _orc_profile.read_time = ADD_TIMER_WITH_LEVEL(_profile, 
"FileReadTime", 1);
-        _orc_profile.read_calls = ADD_COUNTER_WITH_LEVEL(_profile, 
"FileReadCalls", TUnit::UNIT, 1);
-        _orc_profile.read_bytes =
-                ADD_COUNTER_WITH_LEVEL(_profile, "FileReadBytes", 
TUnit::BYTES, 1);
         _orc_profile.column_read_time =
                 ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ColumnReadTime", 
orc_profile, 1);
         _orc_profile.get_batch_time =
@@ -290,7 +277,7 @@ Status OrcReader::_create_file_reader() {
                 _profile, _system_properties, _file_description, 
reader_options,
                 io::DelegateReader::AccessMode::RANDOM, _io_ctx));
         _file_input_stream = std::make_unique<ORCFileInputStream>(
-                _scan_range.path, std::move(inner_reader), &_statistics, 
_io_ctx, _profile,
+                _scan_range.path, std::move(inner_reader), _io_ctx, _profile,
                 _orc_once_max_read_bytes, _orc_max_merge_distance_bytes);
     }
     if (_file_input_stream->getLength() == 0) {
@@ -1840,6 +1827,9 @@ Status OrcReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
                        _reader_metrics.SelectedRowGroupCount);
         COUNTER_UPDATE(_orc_profile.evaluated_row_group_count,
                        _reader_metrics.EvaluatedRowGroupCount);
+        if (_io_ctx) {
+            _io_ctx->file_reader_stats->read_rows += 
_reader_metrics.ReadRowCount;
+        }
     }
     if (_orc_filter) {
         RETURN_IF_ERROR(_orc_filter->get_status());
@@ -2757,6 +2747,14 @@ void 
ORCFileInputStream::_build_small_ranges_input_stripe_streams(
         auto merge_range_file_reader =
                 std::make_shared<OrcMergeRangeFileReader>(_profile, 
_file_reader, merged_range);
 
+        std::shared_ptr<io::FileReader> tracing_file_reader;
+        if (_io_ctx) {
+            tracing_file_reader = std::make_shared<io::TracingFileReader>(
+                    std::move(merge_range_file_reader), 
_io_ctx->file_reader_stats);
+        } else {
+            tracing_file_reader = std::move(merge_range_file_reader);
+        }
+
         // Use binary search to find the starting point in sorted_ranges
         auto it =
                 std::lower_bound(sorted_ranges.begin(), sorted_ranges.end(),
@@ -2769,7 +2767,7 @@ void 
ORCFileInputStream::_build_small_ranges_input_stripe_streams(
              ++it) {
             if (it->second.end_offset <= merged_range.end_offset) {
                 auto stripe_stream_input_stream = 
std::make_shared<StripeStreamInputStream>(
-                        getName(), merge_range_file_reader, _statistics, 
_io_ctx, _profile);
+                        getName(), tracing_file_reader, _io_ctx, _profile);
                 streams.emplace(it->first, stripe_stream_input_stream);
                 _stripe_streams.emplace_back(stripe_stream_input_stream);
             }
@@ -2782,10 +2780,12 @@ void 
ORCFileInputStream::_build_large_ranges_input_stripe_streams(
         std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& 
streams) {
     for (const auto& range : ranges) {
         auto stripe_stream_input_stream = 
std::make_shared<StripeStreamInputStream>(
-                getName(), _file_reader, _statistics, _io_ctx, _profile);
-        streams.emplace(range.first,
-                        std::make_shared<StripeStreamInputStream>(getName(), 
_file_reader,
-                                                                  _statistics, 
_io_ctx, _profile));
+                getName(),
+                _io_ctx ? std::make_shared<io::TracingFileReader>(_file_reader,
+                                                                  
_io_ctx->file_reader_stats)
+                        : _file_reader,
+                _io_ctx, _profile);
+        streams.emplace(range.first, stripe_stream_input_stream);
         _stripe_streams.emplace_back(stripe_stream_input_stream);
     }
 }
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h 
b/be/src/vec/exec/format/orc/vorc_reader.h
index cc33034b2cf..febb7202857 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -38,6 +38,7 @@
 #include "io/fs/buffered_reader.h"
 #include "io/fs/file_reader.h"
 #include "io/fs/file_reader_writer_fwd.h"
+#include "io/fs/tracing_file_reader.h"
 #include "olap/olap_common.h"
 #include "orc/Reader.hh"
 #include "orc/Type.hh"
@@ -121,9 +122,6 @@ public:
     }
 
     struct Statistics {
-        int64_t fs_read_time = 0;
-        int64_t fs_read_calls = 0;
-        int64_t fs_read_bytes = 0;
         int64_t column_read_time = 0;
         int64_t get_batch_time = 0;
         int64_t create_reader_time = 0;
@@ -220,6 +218,8 @@ public:
     }
     static const orc::Type& remove_acid(const orc::Type& type);
 
+    bool count_read_rows() override { return true; }
+
 protected:
     void _collect_profile_before_close() override;
 
@@ -686,11 +686,9 @@ private:
 class StripeStreamInputStream : public orc::InputStream, public 
ProfileCollector {
 public:
     StripeStreamInputStream(const std::string& file_name, io::FileReaderSPtr 
inner_reader,
-                            OrcReader::Statistics* statistics, const 
io::IOContext* io_ctx,
-                            RuntimeProfile* profile)
+                            const io::IOContext* io_ctx, RuntimeProfile* 
profile)
             : _file_name(file_name),
               _inner_reader(inner_reader),
-              _statistics(statistics),
               _io_ctx(io_ctx),
               _profile(profile) {}
 
@@ -727,7 +725,6 @@ private:
     const std::string& _file_name;
     io::FileReaderSPtr _inner_reader;
     // Owned by OrcReader
-    OrcReader::Statistics* _statistics = nullptr;
     const io::IOContext* _io_ctx = nullptr;
     RuntimeProfile* _profile = nullptr;
 };
@@ -735,21 +732,22 @@ private:
 class ORCFileInputStream : public orc::InputStream, public ProfileCollector {
 public:
     ORCFileInputStream(const std::string& file_name, io::FileReaderSPtr 
inner_reader,
-                       OrcReader::Statistics* statistics, const io::IOContext* 
io_ctx,
-                       RuntimeProfile* profile, int64_t 
orc_once_max_read_bytes,
-                       int64_t orc_max_merge_distance_bytes)
+                       const io::IOContext* io_ctx, RuntimeProfile* profile,
+                       int64_t orc_once_max_read_bytes, int64_t 
orc_max_merge_distance_bytes)
             : _file_name(file_name),
               _inner_reader(inner_reader),
               _file_reader(inner_reader),
+              _tracing_file_reader(io_ctx ? 
std::make_shared<io::TracingFileReader>(
+                                                    _file_reader, 
io_ctx->file_reader_stats)
+                                          : _file_reader),
               _orc_once_max_read_bytes(orc_once_max_read_bytes),
               _orc_max_merge_distance_bytes(orc_max_merge_distance_bytes),
-              _statistics(statistics),
               _io_ctx(io_ctx),
               _profile(profile) {}
 
     ~ORCFileInputStream() override {
-        if (_file_reader != nullptr) {
-            _file_reader->collect_profile_before_close();
+        if (_tracing_file_reader != nullptr) {
+            _tracing_file_reader->collect_profile_before_close();
         }
         for (const auto& stripe_stream : _stripe_streams) {
             if (stripe_stream != nullptr) {
@@ -759,7 +757,7 @@ public:
         _stripe_streams.clear();
     }
 
-    uint64_t getLength() const override { return _file_reader->size(); }
+    uint64_t getLength() const override { return _tracing_file_reader->size(); 
}
 
     uint64_t getNaturalReadSize() const override { return 
config::orc_natural_read_size_mb << 20; }
 
@@ -778,6 +776,8 @@ public:
 
     io::FileReaderSPtr& get_inner_reader() { return _inner_reader; }
 
+    io::FileReaderSPtr& get_tracing_file_reader() { return 
_tracing_file_reader; }
+
 protected:
     void _collect_profile_at_runtime() override {};
     void _collect_profile_before_close() override;
@@ -796,8 +796,16 @@ private:
             std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& 
streams);
 
     const std::string& _file_name;
+
+    // _inner_reader is original file reader.
+    // _file_reader == RangeCacheFileReader used by tiny stripe case, if not 
tiny stripe case,
+    // _file_reader == _inner_reader.
+    // _tracing_file_reader is tracing file reader with io context.
+    // If io_ctx is null, _tracing_file_reader will be the same as 
_file_reader.
     io::FileReaderSPtr _inner_reader;
     io::FileReaderSPtr _file_reader;
+    io::FileReaderSPtr _tracing_file_reader;
+
     bool _is_all_tiny_stripes = false;
     int64_t _orc_once_max_read_bytes;
     int64_t _orc_max_merge_distance_bytes;
@@ -805,7 +813,6 @@ private:
     std::vector<std::shared_ptr<StripeStreamInputStream>> _stripe_streams;
 
     // Owned by OrcReader
-    OrcReader::Statistics* _statistics = nullptr;
     const io::IOContext* _io_ctx = nullptr;
     RuntimeProfile* _profile = nullptr;
 };
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 316b19701c7..331e9696250 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -32,6 +32,7 @@
 #include "io/fs/buffered_reader.h"
 #include "io/fs/file_reader.h"
 #include "io/fs/file_reader_writer_fwd.h"
+#include "io/fs/tracing_file_reader.h"
 #include "parquet_pred_cmp.h"
 #include "parquet_thrift_util.h"
 #include "runtime/define_primitive_type.h"
@@ -118,6 +119,12 @@ ParquetReader::~ParquetReader() {
     _close_internal();
 }
 
+// for unit test
+void ParquetReader::set_file_reader(io::FileReaderSPtr file_reader) {
+    _file_reader = file_reader;
+    _tracing_file_reader = file_reader;
+}
+
 void ParquetReader::_init_profile() {
     if (_profile != nullptr) {
         static const char* parquet_profile = "ParquetReader";
@@ -157,14 +164,8 @@ void ParquetReader::_init_profile() {
                 ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PageIndexParseTime", 
parquet_profile, 1);
         _parquet_profile.row_group_filter_time =
                 ADD_CHILD_TIMER_WITH_LEVEL(_profile, "RowGroupFilterTime", 
parquet_profile, 1);
-
-        _parquet_profile.file_read_time = ADD_TIMER_WITH_LEVEL(_profile, 
"FileReadTime", 1);
-        _parquet_profile.file_read_calls =
-                ADD_COUNTER_WITH_LEVEL(_profile, "FileReadCalls", TUnit::UNIT, 
1);
         _parquet_profile.file_meta_read_calls =
                 ADD_COUNTER_WITH_LEVEL(_profile, "FileMetaReadCalls", 
TUnit::UNIT, 1);
-        _parquet_profile.file_read_bytes =
-                ADD_COUNTER_WITH_LEVEL(_profile, "FileReadBytes", 
TUnit::BYTES, 1);
         _parquet_profile.decompress_time =
                 ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecompressTime", 
parquet_profile, 1);
         _parquet_profile.decompress_cnt = ADD_CHILD_COUNTER_WITH_LEVEL(
@@ -215,18 +216,22 @@ Status ParquetReader::_open_file() {
         _file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
                 _profile, _system_properties, _file_description, 
reader_options,
                 io::DelegateReader::AccessMode::RANDOM, _io_ctx));
+        _tracing_file_reader = _io_ctx ? 
std::make_shared<io::TracingFileReader>(
+                                                 _file_reader, 
_io_ctx->file_reader_stats)
+                                       : _file_reader;
     }
     if (_file_metadata == nullptr) {
         SCOPED_RAW_TIMER(&_statistics.parse_footer_time);
-        if (_file_reader->size() <= sizeof(PARQUET_VERSION_NUMBER)) {
+        if (_tracing_file_reader->size() <= sizeof(PARQUET_VERSION_NUMBER)) {
             // Some system may generate parquet file with only 4 bytes: PAR1
             // Should consider it as empty file.
             return Status::EndOfFile("open file failed, empty parquet file {} 
with size: {}",
-                                     _scan_range.path, _file_reader->size());
+                                     _scan_range.path, 
_tracing_file_reader->size());
         }
         size_t meta_size = 0;
         if (_meta_cache == nullptr) {
-            auto st = parse_thrift_footer(_file_reader, &_file_metadata, 
&meta_size, _io_ctx);
+            auto st =
+                    parse_thrift_footer(_tracing_file_reader, &_file_metadata, 
&meta_size, _io_ctx);
             // wrap it with unique ptr, so that it can be released finally.
             _file_metadata_ptr.reset(_file_metadata);
             RETURN_IF_ERROR(st);
@@ -235,7 +240,7 @@ Status ParquetReader::_open_file() {
             // parse magic number & parse meta data
             _column_statistics.meta_read_calls += 1;
         } else {
-            RETURN_IF_ERROR(_meta_cache->get_parquet_footer(_file_reader, 
_io_ctx,
+            
RETURN_IF_ERROR(_meta_cache->get_parquet_footer(_tracing_file_reader, _io_ctx,
                                                             
_file_description.mtime, &meta_size,
                                                             
&_meta_cache_handle));
             _column_statistics.read_bytes += meta_size;
@@ -608,9 +613,12 @@ Status ParquetReader::_next_row_group_reader() {
                                               _profile, _file_reader, 
io_ranges)
                                     : _file_reader;
     }
-    _current_group_reader.reset(new RowGroupReader(
-            group_file_reader, _read_table_columns, 
row_group_index.row_group_id, row_group, _ctz,
-            _io_ctx, position_delete_ctx, _lazy_read_ctx, _state));
+    _current_group_reader.reset(
+            new RowGroupReader(_io_ctx ? 
std::make_shared<io::TracingFileReader>(
+                                                 group_file_reader, 
_io_ctx->file_reader_stats)
+                                       : group_file_reader,
+                               _read_table_columns, 
row_group_index.row_group_id, row_group, _ctz,
+                               _io_ctx, position_delete_ctx, _lazy_read_ctx, 
_state));
     _row_group_eof = false;
 
     _current_group_reader->_table_info_node_ptr = _table_info_node_ptr;
@@ -748,6 +756,9 @@ Status ParquetReader::_process_page_index(const 
tparquet::RowGroup& row_group,
     std::function<void()> read_whole_row_group = [&]() {
         candidate_row_ranges.emplace_back(0, row_group.num_rows);
         _statistics.read_rows += row_group.num_rows;
+        if (_io_ctx) {
+            _io_ctx->file_reader_stats->read_rows += row_group.num_rows;
+        }
     };
 
     if ((!_enable_filter_by_min_max) || _lazy_read_ctx.has_complex_type ||
@@ -766,8 +777,8 @@ Status ParquetReader::_process_page_index(const 
tparquet::RowGroup& row_group,
     Slice result(col_index_buff.data(), page_index._column_index_size);
     {
         SCOPED_RAW_TIMER(&_statistics.read_page_index_time);
-        RETURN_IF_ERROR(_file_reader->read_at(page_index._column_index_start, 
result, &bytes_read,
-                                              _io_ctx));
+        
RETURN_IF_ERROR(_tracing_file_reader->read_at(page_index._column_index_start, 
result,
+                                                      &bytes_read, _io_ctx));
     }
     _column_statistics.read_bytes += bytes_read;
     auto& schema_desc = _file_metadata->schema();
@@ -776,8 +787,8 @@ Status ParquetReader::_process_page_index(const 
tparquet::RowGroup& row_group,
     Slice res(off_index_buff.data(), page_index._offset_index_size);
     {
         SCOPED_RAW_TIMER(&_statistics.read_page_index_time);
-        RETURN_IF_ERROR(
-                _file_reader->read_at(page_index._offset_index_start, res, 
&bytes_read, _io_ctx));
+        
RETURN_IF_ERROR(_tracing_file_reader->read_at(page_index._offset_index_start, 
res,
+                                                      &bytes_read, _io_ctx));
     }
     _column_statistics.read_bytes += bytes_read;
     // read twice: parse column index & parse offset index
@@ -856,6 +867,9 @@ Status ParquetReader::_process_page_index(const 
tparquet::RowGroup& row_group,
         read_rows += row_group.num_rows - skip_end;
     }
     _statistics.read_rows += read_rows;
+    if (_io_ctx) {
+        _io_ctx->file_reader_stats->read_rows += read_rows;
+    }
     _statistics.filtered_page_rows += row_group.num_rows - read_rows;
     return Status::OK();
 }
@@ -1006,10 +1020,7 @@ void ParquetReader::_collect_profile() {
                    _column_statistics.parse_page_header_num);
     COUNTER_UPDATE(_parquet_profile.predicate_filter_time, 
_statistics.predicate_filter_time);
     COUNTER_UPDATE(_parquet_profile.dict_filter_rewrite_time, 
_statistics.dict_filter_rewrite_time);
-    COUNTER_UPDATE(_parquet_profile.file_read_time, 
_column_statistics.read_time);
-    COUNTER_UPDATE(_parquet_profile.file_read_calls, 
_column_statistics.read_calls);
     COUNTER_UPDATE(_parquet_profile.file_meta_read_calls, 
_column_statistics.meta_read_calls);
-    COUNTER_UPDATE(_parquet_profile.file_read_bytes, 
_column_statistics.read_bytes);
     COUNTER_UPDATE(_parquet_profile.decompress_time, 
_column_statistics.decompress_time);
     COUNTER_UPDATE(_parquet_profile.decompress_cnt, 
_column_statistics.decompress_cnt);
     COUNTER_UPDATE(_parquet_profile.decode_header_time, 
_column_statistics.decode_header_time);
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 814964833db..c560b1c3800 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -105,8 +105,8 @@ public:
                   io::IOContext* io_ctx, RuntimeState* state, bool 
enable_lazy_mat = true);
 
     ~ParquetReader() override;
-    // for test
-    void set_file_reader(io::FileReaderSPtr file_reader) { _file_reader = 
file_reader; }
+    // for unit test
+    void set_file_reader(io::FileReaderSPtr file_reader);
 
     Status init_reader(
             const std::vector<std::string>& all_column_names,
@@ -153,6 +153,8 @@ public:
 
     Status get_file_metadata_schema(const FieldDescriptor** ptr);
 
+    bool count_read_rows() override { return true; }
+
 protected:
     void _collect_profile_before_close() override;
 
@@ -176,10 +178,7 @@ private:
         RuntimeProfile::Counter* read_page_index_time = nullptr;
         RuntimeProfile::Counter* parse_page_index_time = nullptr;
 
-        RuntimeProfile::Counter* file_read_time = nullptr;
-        RuntimeProfile::Counter* file_read_calls = nullptr;
         RuntimeProfile::Counter* file_meta_read_calls = nullptr;
-        RuntimeProfile::Counter* file_read_bytes = nullptr;
         RuntimeProfile::Counter* decompress_time = nullptr;
         RuntimeProfile::Counter* decompress_cnt = nullptr;
         RuntimeProfile::Counter* decode_header_time = nullptr;
@@ -244,7 +243,12 @@ private:
     FileMetaData* _file_metadata = nullptr;
     const tparquet::FileMetaData* _t_metadata = nullptr;
 
+    // _tracing_file_reader wraps _file_reader.
+    // _file_reader is original file reader.
+    // _tracing_file_reader is tracing file reader with io context.
+    // If io_ctx is null, _tracing_file_reader will be the same as file_reader.
     io::FileReaderSPtr _file_reader = nullptr;
+    io::FileReaderSPtr _tracing_file_reader = nullptr;
     std::unique_ptr<RowGroupReader> _current_group_reader;
     // read to the end of current reader
     bool _row_group_eof = true;
diff --git a/be/src/vec/exec/format/table/table_format_reader.h 
b/be/src/vec/exec/format/table/table_format_reader.h
index fbba6a51033..1f79de54ed4 100644
--- a/be/src/vec/exec/format/table/table_format_reader.h
+++ b/be/src/vec/exec/format/table/table_format_reader.h
@@ -104,6 +104,8 @@ public:
 
     virtual Status init_row_filters() = 0;
 
+    bool count_read_rows() override { return 
_file_format_reader->count_read_rows(); }
+
 protected:
     std::string _table_format;                          // hudi, iceberg, 
paimon
     std::unique_ptr<GenericReader> _file_format_reader; // parquet, orc
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index be3b5d3d61b..e65a761a182 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -556,9 +556,18 @@ void NewOlapScanner::update_realtime_counters() {
     // In case of no cache, we still need to update the IO stats. uncompressed 
bytes read == local + remote
     if (stats.file_cache_stats.bytes_read_from_local == 0 &&
         stats.file_cache_stats.bytes_read_from_remote == 0) {
+        if (_query_statistics) {
+            
_query_statistics->add_scan_bytes_from_local_storage(stats.compressed_bytes_read);
+        }
         DorisMetrics::instance()->query_scan_bytes_from_local->increment(
                 stats.compressed_bytes_read);
     } else {
+        if (_query_statistics) {
+            _query_statistics->add_scan_bytes_from_local_storage(
+                    stats.file_cache_stats.bytes_read_from_local);
+            _query_statistics->add_scan_bytes_from_remote_storage(
+                    stats.file_cache_stats.bytes_read_from_remote);
+        }
         DorisMetrics::instance()->query_scan_bytes_from_local->increment(
                 stats.file_cache_stats.bytes_read_from_local);
         DorisMetrics::instance()->query_scan_bytes_from_remote->increment(
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 0cba6b8a0c2..b744ae0d174 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -38,6 +38,7 @@
 #include "common/logging.h"
 #include "common/status.h"
 #include "io/cache/block_file_cache_profile.h"
+#include "io/fs/tracing_file_reader.h"
 #include "runtime/descriptors.h"
 #include "runtime/runtime_state.h"
 #include "runtime/types.h"
@@ -146,14 +147,24 @@ Status VFileScanner::prepare(RuntimeState* state, const 
VExprContextSPtrs& conju
                                                      "NotFoundFileNum", 
TUnit::UNIT, 1);
     _file_counter =
             ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), 
"FileNumber", TUnit::UNIT, 1);
+
+    _file_read_bytes_counter = 
ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
+                                                      "FileReadBytes", 
TUnit::BYTES, 1);
+    _file_read_calls_counter = 
ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
+                                                      "FileReadCalls", 
TUnit::UNIT, 1);
+    _file_read_time_counter =
+            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), 
"FileReadTime", 1);
+
     _runtime_filter_partition_pruned_range_counter =
             ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
                                    "RuntimeFilterPartitionPrunedRangeNum", 
TUnit::UNIT, 1);
 
     _file_cache_statistics.reset(new io::FileCacheStatistics());
+    _file_reader_stats.reset(new io::FileReaderStats());
+
     _io_ctx.reset(new io::IOContext());
     _io_ctx->file_cache_stats = _file_cache_statistics.get();
-    _io_ctx->query_id = &_state->query_id();
+    _io_ctx->file_reader_stats = _file_reader_stats.get();
 
     if (_is_load) {
         _src_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
@@ -435,6 +446,9 @@ Status VFileScanner::_get_block_wrapped(RuntimeState* 
state, Block* block, bool*
         // use read_rows instead of _src_block_ptr->rows(), because the first 
column of _src_block_ptr
         // may not be filled after calling `get_next_block()`, so 
_src_block_ptr->rows() may return wrong result.
         if (read_rows > 0) {
+            if ((!_cur_reader->count_read_rows()) && _io_ctx) {
+                _io_ctx->file_reader_stats->read_rows += read_rows;
+            }
             // If the push_down_agg_type is COUNT, no need to do the rest,
             // because we only save a number in block.
             if (_get_push_down_agg_type() != TPushAggOp::type::COUNT) {
@@ -1478,6 +1492,49 @@ void VFileScanner::try_stop() {
     }
 }
 
+void VFileScanner::update_realtime_counters() {
+    pipeline::FileScanLocalState* local_state =
+            static_cast<pipeline::FileScanLocalState*>(_local_state);
+
+    COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes);
+    COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows);
+
+    if (_query_statistics) {
+        _query_statistics->add_scan_rows(_file_reader_stats->read_rows);
+        _query_statistics->add_scan_bytes(_file_reader_stats->read_bytes);
+    }
+
+    if (_file_cache_statistics->bytes_read_from_local == 0 &&
+        _file_cache_statistics->bytes_read_from_remote == 0) {
+        if (_query_statistics) {
+            
_query_statistics->add_scan_bytes_from_remote_storage(_file_reader_stats->read_bytes);
+        }
+        DorisMetrics::instance()->query_scan_bytes_from_local->increment(
+                _file_reader_stats->read_bytes);
+    } else {
+        if (_query_statistics) {
+            _query_statistics->add_scan_bytes_from_local_storage(
+                    _file_cache_statistics->bytes_read_from_local);
+            _query_statistics->add_scan_bytes_from_remote_storage(
+                    _file_cache_statistics->bytes_read_from_remote);
+        }
+        DorisMetrics::instance()->query_scan_bytes_from_local->increment(
+                _file_cache_statistics->bytes_read_from_local);
+        DorisMetrics::instance()->query_scan_bytes_from_remote->increment(
+                _file_cache_statistics->bytes_read_from_remote);
+    }
+
+    COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
+
+    
DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes);
+    
DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows);
+
+    _file_reader_stats->read_bytes = 0;
+    _file_reader_stats->read_rows = 0;
+    _file_cache_statistics->bytes_read_from_local = 0;
+    _file_cache_statistics->bytes_read_from_remote = 0;
+}
+
 void VFileScanner::_collect_profile_before_close() {
     VScanner::_collect_profile_before_close();
     if (config::enable_file_cache && _state->query_options().enable_file_cache 
&&
@@ -1489,6 +1546,18 @@ void VFileScanner::_collect_profile_before_close() {
     if (_cur_reader != nullptr) {
         _cur_reader->collect_profile_before_close();
     }
+
+    pipeline::FileScanLocalState* local_state =
+            static_cast<pipeline::FileScanLocalState*>(_local_state);
+    COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes);
+    COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows);
+
+    COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
+    COUNTER_UPDATE(_file_read_calls_counter, _file_reader_stats->read_calls);
+    COUNTER_UPDATE(_file_read_time_counter, _file_reader_stats->read_time_ns);
+
+    
DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes);
+    
DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows);
 }
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.h 
b/be/src/vec/exec/scan/vfile_scanner.h
index d2e6a220a15..705f01962dc 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -81,6 +81,8 @@ public:
 
     std::string get_current_scan_range_name() override { return 
_current_range_path; }
 
+    void update_realtime_counters() override;
+
 protected:
     Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) 
override;
 
@@ -165,6 +167,7 @@ protected:
     Block _runtime_filter_partition_prune_block;
 
     std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics;
+    std::unique_ptr<io::FileReaderStats> _file_reader_stats;
     std::unique_ptr<io::IOContext> _io_ctx;
 
     std::unordered_map<std::string, std::tuple<std::string, const 
SlotDescriptor*>>
@@ -186,6 +189,9 @@ private:
     RuntimeProfile::Counter* _empty_file_counter = nullptr;
     RuntimeProfile::Counter* _not_found_file_counter = nullptr;
     RuntimeProfile::Counter* _file_counter = nullptr;
+    RuntimeProfile::Counter* _file_read_bytes_counter = nullptr;
+    RuntimeProfile::Counter* _file_read_calls_counter = nullptr;
+    RuntimeProfile::Counter* _file_read_time_counter = nullptr;
     RuntimeProfile::Counter* _runtime_filter_partition_pruned_range_counter = 
nullptr;
 
     const std::unordered_map<std::string, int>* _col_name_to_slot_id = nullptr;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to