This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 03e84346b05 branch-3.1: [enhancement](be_metrics) update scan bytes
metric in file_scanner. #53729 (#54222)
03e84346b05 is described below
commit 03e84346b0507b072888a2bc21ca389679f47317
Author: Qi Chen <[email protected]>
AuthorDate: Mon Aug 4 10:01:42 2025 +0800
branch-3.1: [enhancement](be_metrics) update scan bytes metric in
file_scanner. #53729 (#54222)
Cherry-pick #53729
---
be/src/apache-orc | 2 +-
be/src/io/fs/tracing_file_reader.h | 58 ++++++++++++++++++
be/src/io/io_common.h | 8 +++
be/src/pipeline/exec/file_scan_operator.h | 1 +
be/src/service/internal_service.cpp | 2 +
.../vec/exec/format/arrow/arrow_stream_reader.cpp | 15 ++++-
be/src/vec/exec/format/arrow/arrow_stream_reader.h | 1 +
be/src/vec/exec/format/csv/csv_reader.cpp | 6 +-
.../file_reader/new_plain_text_line_reader.cpp | 20 ++----
.../file_reader/new_plain_text_line_reader.h | 2 -
be/src/vec/exec/format/generic_reader.h | 5 ++
be/src/vec/exec/format/json/new_json_reader.cpp | 15 ++---
be/src/vec/exec/format/json/new_json_reader.h | 2 -
be/src/vec/exec/format/orc/vorc_reader.cpp | 40 ++++++------
be/src/vec/exec/format/orc/vorc_reader.h | 37 ++++++-----
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 51 ++++++++++------
be/src/vec/exec/format/parquet/vparquet_reader.h | 14 +++--
be/src/vec/exec/format/table/table_format_reader.h | 2 +
be/src/vec/exec/scan/new_olap_scanner.cpp | 9 +++
be/src/vec/exec/scan/vfile_scanner.cpp | 71 +++++++++++++++++++++-
be/src/vec/exec/scan/vfile_scanner.h | 6 ++
21 files changed, 276 insertions(+), 91 deletions(-)
diff --git a/be/src/apache-orc b/be/src/apache-orc
index ef68c6ff736..70b673c7d29 160000
--- a/be/src/apache-orc
+++ b/be/src/apache-orc
@@ -1 +1 @@
-Subproject commit ef68c6ff736a84c8c7185d4a08397c67eff53ad6
+Subproject commit 70b673c7d299690ced7c4c600fabaee9e1601198
diff --git a/be/src/io/fs/tracing_file_reader.h
b/be/src/io/fs/tracing_file_reader.h
new file mode 100644
index 00000000000..84eb3dfc8fb
--- /dev/null
+++ b/be/src/io/fs/tracing_file_reader.h
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include "common/status.h"
+#include "io/fs/file_reader.h"
+#include "util/runtime_profile.h"
+
+namespace doris {
+
+namespace io {
+
+class TracingFileReader : public FileReader {
+public:
+ TracingFileReader(doris::io::FileReaderSPtr inner, FileReaderStats* stats)
+ : _inner(std::move(inner)), _stats(stats) {}
+
+ Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+ const IOContext* io_ctx) override {
+ SCOPED_RAW_TIMER(&_stats->read_time_ns);
+ Status st = _inner->read_at(offset, result, bytes_read, io_ctx);
+ _stats->read_calls++;
+ _stats->read_bytes += *bytes_read;
+ return st;
+ }
+
+ Status close() override { return _inner->close(); }
+ const doris::io::Path& path() const override { return _inner->path(); }
+ size_t size() const override { return _inner->size(); }
+ bool closed() const override { return _inner->closed(); }
+ const std::string& get_data_dir_path() override { return
_inner->get_data_dir_path(); }
+
+ void _collect_profile_at_runtime() override { return
_inner->collect_profile_at_runtime(); }
+ void _collect_profile_before_close() override { return
_inner->collect_profile_before_close(); }
+
+ FileReaderStats* stats() const { return _stats; }
+
+private:
+ doris::io::FileReaderSPtr _inner;
+ FileReaderStats* _stats;
+};
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/io_common.h b/be/src/io/io_common.h
index 909941181d3..6934aa6a75a 100644
--- a/be/src/io/io_common.h
+++ b/be/src/io/io_common.h
@@ -35,6 +35,13 @@ enum class ReaderType : uint8_t {
namespace io {
+struct FileReaderStats {
+ size_t read_calls = 0;
+ size_t read_bytes = 0;
+ int64_t read_time_ns = 0;
+ size_t read_rows = 0;
+};
+
struct FileCacheStatistics {
int64_t num_local_io_total = 0;
int64_t num_remote_io_total = 0;
@@ -73,6 +80,7 @@ struct IOContext {
int64_t expiration_time = 0;
const TUniqueId* query_id = nullptr; // Ref
FileCacheStatistics* file_cache_stats = nullptr; // Ref
+ FileReaderStats* file_reader_stats = nullptr; // Ref
bool is_inverted_index = false;
// if is_dryrun, read IO will download data to cache but return no data to
reader
// useful to skip cache data read from local disk to accelarate warm up
diff --git a/be/src/pipeline/exec/file_scan_operator.h
b/be/src/pipeline/exec/file_scan_operator.h
index 25635dcdd62..f4a89d4bdec 100644
--- a/be/src/pipeline/exec/file_scan_operator.h
+++ b/be/src/pipeline/exec/file_scan_operator.h
@@ -55,6 +55,7 @@ public:
std::string name_suffix() const override;
private:
+ friend class vectorized::VFileScanner;
std::shared_ptr<vectorized::SplitSourceConnector> _split_source = nullptr;
int _max_scanners;
// A in memory cache to save some common components
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index d33a8240a95..a1c5c98ce45 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -825,6 +825,8 @@ void
PInternalService::fetch_table_schema(google::protobuf::RpcController* contr
io::IOContext io_ctx;
io::FileCacheStatistics file_cache_statis;
io_ctx.file_cache_stats = &file_cache_statis;
+ io::FileReaderStats file_reader_stats;
+ io_ctx.file_reader_stats = &file_reader_stats;
// file_slots is no use, but the lifetime should be longer than reader
std::vector<SlotDescriptor*> file_slots;
switch (params.format_type) {
diff --git a/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp
b/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp
index efe8e36bf20..f5888c88ce2 100644
--- a/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp
+++ b/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp
@@ -25,6 +25,7 @@
#include "arrow_pip_input_stream.h"
#include "common/logging.h"
#include "io/fs/stream_load_pipe.h"
+#include "io/fs/tracing_file_reader.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "vec/core/block.h"
@@ -43,14 +44,22 @@ ArrowStreamReader::ArrowStreamReader(RuntimeState* state,
RuntimeProfile* profil
const TFileRangeDesc& range,
const std::vector<SlotDescriptor*>&
file_slot_descs,
io::IOContext* io_ctx)
- : _state(state), _range(range), _file_slot_descs(file_slot_descs),
_file_reader(nullptr) {
+ : _state(state),
+ _range(range),
+ _file_slot_descs(file_slot_descs),
+ _io_ctx(io_ctx),
+ _file_reader(nullptr) {
TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone,
_ctzz);
}
ArrowStreamReader::~ArrowStreamReader() = default;
Status ArrowStreamReader::init_reader() {
- RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id,
&_file_reader, _state, false));
+ io::FileReaderSPtr file_reader;
+ RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id,
&file_reader, _state, false));
+ _file_reader = _io_ctx ?
std::make_shared<io::TracingFileReader>(std::move(file_reader),
+
_io_ctx->file_reader_stats)
+ : file_reader;
_pip_stream = ArrowPipInputStream::create_unique(_file_reader);
return Status::OK();
}
@@ -121,4 +130,4 @@ Status
ArrowStreamReader::get_columns(std::unordered_map<std::string, TypeDescri
}
#include "common/compile_check_end.h"
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/arrow/arrow_stream_reader.h
b/be/src/vec/exec/format/arrow/arrow_stream_reader.h
index 830753326b9..c679361e243 100644
--- a/be/src/vec/exec/format/arrow/arrow_stream_reader.h
+++ b/be/src/vec/exec/format/arrow/arrow_stream_reader.h
@@ -65,6 +65,7 @@ private:
RuntimeState* _state;
const TFileRangeDesc& _range;
const std::vector<SlotDescriptor*>& _file_slot_descs;
+ io::IOContext* _io_ctx;
io::FileReaderSPtr _file_reader;
std::unique_ptr<doris::vectorized::ArrowPipInputStream> _pip_stream;
cctz::time_zone _ctzz;
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp
b/be/src/vec/exec/format/csv/csv_reader.cpp
index a853137eeb0..8eef40f7875 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -39,6 +39,7 @@
#include "io/fs/buffered_reader.h"
#include "io/fs/file_reader.h"
#include "io/fs/s3_file_reader.h"
+#include "io/fs/tracing_file_reader.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "util/string_util.h"
@@ -541,10 +542,13 @@ Status CsvReader::_create_file_reader(bool need_schema) {
_file_description.mtime = _range.__isset.modification_time ?
_range.modification_time : 0;
io::FileReaderOptions reader_options =
FileFactory::get_reader_options(_state, _file_description);
- _file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
+ auto file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
_profile, _system_properties, _file_description,
reader_options,
io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
io::PrefetchRange(_range.start_offset, _range.start_offset +
_range.size)));
+ _file_reader = _io_ctx ?
std::make_shared<io::TracingFileReader>(std::move(file_reader),
+
_io_ctx->file_reader_stats)
+ : file_reader;
}
if (_file_reader->size() == 0 && _params.file_type !=
TFileType::FILE_STREAM &&
_params.file_type != TFileType::FILE_BROKER) {
diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
index e66c622a1e9..a068a748b11 100644
--- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
+++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
@@ -221,12 +221,8 @@
NewPlainTextLineReader::NewPlainTextLineReader(RuntimeProfile* profile,
_more_input_bytes(0),
_more_output_bytes(0),
_current_offset(current_offset),
- _bytes_read_counter(nullptr),
- _read_timer(nullptr),
_bytes_decompress_counter(nullptr),
_decompress_timer(nullptr) {
- _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES);
- _read_timer = ADD_TIMER(_profile, "FileReadTime");
_bytes_decompress_counter = ADD_COUNTER(_profile, "BytesDecompressed",
TUnit::BYTES);
_decompress_timer = ADD_TIMER(_profile, "DecompressTime");
}
@@ -384,16 +380,12 @@ Status NewPlainTextLineReader::read_line(const uint8_t**
ptr, size_t* size, bool
}
}
- {
- SCOPED_TIMER(_read_timer);
- Slice file_slice(file_buf, buffer_len);
- RETURN_IF_ERROR(
- _file_reader->read_at(_current_offset, file_slice,
&read_len, io_ctx));
- _current_offset += read_len;
- if (read_len == 0) {
- _file_eof = true;
- }
- COUNTER_UPDATE(_bytes_read_counter, read_len);
+ Slice file_slice(file_buf, buffer_len);
+ RETURN_IF_ERROR(
+ _file_reader->read_at(_current_offset, file_slice,
&read_len, io_ctx));
+ _current_offset += read_len;
+ if (read_len == 0) {
+ _file_eof = true;
}
if (_file_eof || read_len == 0) {
if (!stream_end) {
diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
index 3e8983214c0..730dc2e9cd9 100644
--- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
+++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
@@ -332,8 +332,6 @@ private:
size_t _current_offset;
// Profile counters
- RuntimeProfile::Counter* _bytes_read_counter = nullptr;
- RuntimeProfile::Counter* _read_timer = nullptr;
RuntimeProfile::Counter* _bytes_decompress_counter = nullptr;
RuntimeProfile::Counter* _decompress_timer = nullptr;
};
diff --git a/be/src/vec/exec/format/generic_reader.h
b/be/src/vec/exec/format/generic_reader.h
index 2dfe906bf77..c3efc321e2f 100644
--- a/be/src/vec/exec/format/generic_reader.h
+++ b/be/src/vec/exec/format/generic_reader.h
@@ -74,6 +74,11 @@ public:
virtual Status close() { return Status::OK(); }
+ /// The reader is responsible for counting the number of rows read,
+ /// because some readers, such as parquet/orc,
+ /// can skip some pages/rowgroups through indexes.
+ virtual bool count_read_rows() { return false; }
+
protected:
const size_t _MIN_BATCH_SIZE = 4064; // 4094 - 32(padding)
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp
b/be/src/vec/exec/format/json/new_json_reader.cpp
index d9956e99333..fae64caa119 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -46,6 +46,7 @@
#include "io/fs/buffered_reader.h"
#include "io/fs/file_reader.h"
#include "io/fs/stream_load_pipe.h"
+#include "io/fs/tracing_file_reader.h"
#include "runtime/define_primitive_type.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
@@ -95,9 +96,7 @@ NewJsonReader::NewJsonReader(RuntimeState* state,
RuntimeProfile* profile, Scann
_scanner_eof(scanner_eof),
_current_offset(0),
_io_ctx(io_ctx) {
- _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES);
_read_timer = ADD_TIMER(_profile, "ReadTime");
- _file_read_timer = ADD_TIMER(_profile, "FileReadTime");
if (_range.__isset.compress_type) {
// for compatibility
_file_compress_type = _range.compress_type;
@@ -438,10 +437,13 @@ Status NewJsonReader::_open_file_reader(bool need_schema)
{
_file_description.mtime = _range.__isset.modification_time ?
_range.modification_time : 0;
io::FileReaderOptions reader_options =
FileFactory::get_reader_options(_state, _file_description);
- _file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
+ auto file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
_profile, _system_properties, _file_description,
reader_options,
io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
io::PrefetchRange(_range.start_offset, _range.size)));
+ _file_reader = _io_ctx ?
std::make_shared<io::TracingFileReader>(std::move(file_reader),
+
_io_ctx->file_reader_stats)
+ : file_reader;
}
return Status::OK();
}
@@ -662,7 +664,7 @@ Status NewJsonReader::_parse_json(bool* is_empty_row, bool*
eof) {
// return Status::OK() if parse succeed or reach EOF.
Status NewJsonReader::_parse_json_doc(size_t* size, bool* eof) {
// read a whole message
- SCOPED_TIMER(_file_read_timer);
+ SCOPED_TIMER(_read_timer);
const uint8_t* json_str = nullptr;
std::unique_ptr<uint8_t[]> json_str_ptr;
if (_line_reader != nullptr) {
@@ -675,7 +677,6 @@ Status NewJsonReader::_parse_json_doc(size_t* size, bool*
eof) {
}
}
- _bytes_read_counter += *size;
if (*eof) {
return Status::OK();
}
@@ -1931,7 +1932,7 @@ Status
NewJsonReader::_append_error_msg(simdjson::ondemand::object* obj, std::st
Status NewJsonReader::_simdjson_parse_json(size_t* size, bool* is_empty_row,
bool* eof,
simdjson::error_code* error) {
- SCOPED_TIMER(_file_read_timer);
+ SCOPED_TIMER(_read_timer);
// step1: read buf from pipe.
if (_line_reader != nullptr) {
RETURN_IF_ERROR(_line_reader->read_line(&_json_str, size, eof,
_io_ctx));
@@ -1992,7 +1993,7 @@ Status NewJsonReader::_judge_empty_row(size_t size, bool
eof, bool* is_empty_row
Status NewJsonReader::_get_json_value(size_t* size, bool* eof,
simdjson::error_code* error,
bool* is_empty_row) {
- SCOPED_TIMER(_file_read_timer);
+ SCOPED_TIMER(_read_timer);
auto return_quality_error = [&](fmt::memory_buffer& error_msg,
const std::string& doc_info) -> Status {
_counter->num_rows_filtered++;
diff --git a/be/src/vec/exec/format/json/new_json_reader.h
b/be/src/vec/exec/format/json/new_json_reader.h
index 7805286bc84..9f0a39a98b7 100644
--- a/be/src/vec/exec/format/json/new_json_reader.h
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -261,9 +261,7 @@ private:
io::IOContext* _io_ctx = nullptr;
- RuntimeProfile::Counter* _bytes_read_counter = nullptr;
RuntimeProfile::Counter* _read_timer = nullptr;
- RuntimeProfile::Counter* _file_read_timer = nullptr;
// ======SIMD JSON======
// name mapping
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index a35155199eb..b8952c7a695 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -113,9 +113,6 @@ static constexpr int decimal_scale_for_hive11 = 10;
M(TypeIndex::Float64, Float64, orc::DoubleVectorBatch)
void ORCFileInputStream::read(void* buf, uint64_t length, uint64_t offset) {
- _statistics->fs_read_calls++;
- _statistics->fs_read_bytes += length;
- SCOPED_RAW_TIMER(&_statistics->fs_read_time);
uint64_t has_read = 0;
char* out = reinterpret_cast<char*>(buf);
while (has_read < length) {
@@ -124,7 +121,7 @@ void ORCFileInputStream::read(void* buf, uint64_t length,
uint64_t offset) {
}
size_t loop_read;
Slice result(out + has_read, length - has_read);
- Status st = _file_reader->read_at(offset + has_read, result,
&loop_read, _io_ctx);
+ Status st = _tracing_file_reader->read_at(offset + has_read, result,
&loop_read, _io_ctx);
if (!st.ok()) {
throw orc::ParseError(
strings::Substitute("Failed to read $0: $1", _file_name,
st.to_string()));
@@ -141,9 +138,6 @@ void ORCFileInputStream::read(void* buf, uint64_t length,
uint64_t offset) {
}
void StripeStreamInputStream::read(void* buf, uint64_t length, uint64_t
offset) {
- _statistics->fs_read_calls++;
- _statistics->fs_read_bytes += length;
- SCOPED_RAW_TIMER(&_statistics->fs_read_time);
uint64_t has_read = 0;
char* out = reinterpret_cast<char*>(buf);
while (has_read < length) {
@@ -217,9 +211,6 @@ OrcReader::~OrcReader() {
void OrcReader::_collect_profile_before_close() {
if (_profile != nullptr) {
- COUNTER_UPDATE(_orc_profile.read_time, _statistics.fs_read_time);
- COUNTER_UPDATE(_orc_profile.read_calls, _statistics.fs_read_calls);
- COUNTER_UPDATE(_orc_profile.read_bytes, _statistics.fs_read_bytes);
COUNTER_UPDATE(_orc_profile.column_read_time,
_statistics.column_read_time);
COUNTER_UPDATE(_orc_profile.get_batch_time,
_statistics.get_batch_time);
COUNTER_UPDATE(_orc_profile.create_reader_time,
_statistics.create_reader_time);
@@ -245,10 +236,6 @@ void OrcReader::_init_profile() {
if (_profile != nullptr) {
static const char* orc_profile = "OrcReader";
ADD_TIMER_WITH_LEVEL(_profile, orc_profile, 1);
- _orc_profile.read_time = ADD_TIMER_WITH_LEVEL(_profile,
"FileReadTime", 1);
- _orc_profile.read_calls = ADD_COUNTER_WITH_LEVEL(_profile,
"FileReadCalls", TUnit::UNIT, 1);
- _orc_profile.read_bytes =
- ADD_COUNTER_WITH_LEVEL(_profile, "FileReadBytes",
TUnit::BYTES, 1);
_orc_profile.column_read_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ColumnReadTime",
orc_profile, 1);
_orc_profile.get_batch_time =
@@ -290,7 +277,7 @@ Status OrcReader::_create_file_reader() {
_profile, _system_properties, _file_description,
reader_options,
io::DelegateReader::AccessMode::RANDOM, _io_ctx));
_file_input_stream = std::make_unique<ORCFileInputStream>(
- _scan_range.path, std::move(inner_reader), &_statistics,
_io_ctx, _profile,
+ _scan_range.path, std::move(inner_reader), _io_ctx, _profile,
_orc_once_max_read_bytes, _orc_max_merge_distance_bytes);
}
if (_file_input_stream->getLength() == 0) {
@@ -1840,6 +1827,9 @@ Status OrcReader::get_next_block(Block* block, size_t*
read_rows, bool* eof) {
_reader_metrics.SelectedRowGroupCount);
COUNTER_UPDATE(_orc_profile.evaluated_row_group_count,
_reader_metrics.EvaluatedRowGroupCount);
+ if (_io_ctx) {
+ _io_ctx->file_reader_stats->read_rows +=
_reader_metrics.ReadRowCount;
+ }
}
if (_orc_filter) {
RETURN_IF_ERROR(_orc_filter->get_status());
@@ -2757,6 +2747,14 @@ void
ORCFileInputStream::_build_small_ranges_input_stripe_streams(
auto merge_range_file_reader =
std::make_shared<OrcMergeRangeFileReader>(_profile,
_file_reader, merged_range);
+ std::shared_ptr<io::FileReader> tracing_file_reader;
+ if (_io_ctx) {
+ tracing_file_reader = std::make_shared<io::TracingFileReader>(
+ std::move(merge_range_file_reader),
_io_ctx->file_reader_stats);
+ } else {
+ tracing_file_reader = std::move(merge_range_file_reader);
+ }
+
// Use binary search to find the starting point in sorted_ranges
auto it =
std::lower_bound(sorted_ranges.begin(), sorted_ranges.end(),
@@ -2769,7 +2767,7 @@ void
ORCFileInputStream::_build_small_ranges_input_stripe_streams(
++it) {
if (it->second.end_offset <= merged_range.end_offset) {
auto stripe_stream_input_stream =
std::make_shared<StripeStreamInputStream>(
- getName(), merge_range_file_reader, _statistics,
_io_ctx, _profile);
+ getName(), tracing_file_reader, _io_ctx, _profile);
streams.emplace(it->first, stripe_stream_input_stream);
_stripe_streams.emplace_back(stripe_stream_input_stream);
}
@@ -2782,10 +2780,12 @@ void
ORCFileInputStream::_build_large_ranges_input_stripe_streams(
std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>&
streams) {
for (const auto& range : ranges) {
auto stripe_stream_input_stream =
std::make_shared<StripeStreamInputStream>(
- getName(), _file_reader, _statistics, _io_ctx, _profile);
- streams.emplace(range.first,
- std::make_shared<StripeStreamInputStream>(getName(),
_file_reader,
- _statistics,
_io_ctx, _profile));
+ getName(),
+ _io_ctx ? std::make_shared<io::TracingFileReader>(_file_reader,
+
_io_ctx->file_reader_stats)
+ : _file_reader,
+ _io_ctx, _profile);
+ streams.emplace(range.first, stripe_stream_input_stream);
_stripe_streams.emplace_back(stripe_stream_input_stream);
}
}
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h
b/be/src/vec/exec/format/orc/vorc_reader.h
index cc33034b2cf..febb7202857 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -38,6 +38,7 @@
#include "io/fs/buffered_reader.h"
#include "io/fs/file_reader.h"
#include "io/fs/file_reader_writer_fwd.h"
+#include "io/fs/tracing_file_reader.h"
#include "olap/olap_common.h"
#include "orc/Reader.hh"
#include "orc/Type.hh"
@@ -121,9 +122,6 @@ public:
}
struct Statistics {
- int64_t fs_read_time = 0;
- int64_t fs_read_calls = 0;
- int64_t fs_read_bytes = 0;
int64_t column_read_time = 0;
int64_t get_batch_time = 0;
int64_t create_reader_time = 0;
@@ -220,6 +218,8 @@ public:
}
static const orc::Type& remove_acid(const orc::Type& type);
+ bool count_read_rows() override { return true; }
+
protected:
void _collect_profile_before_close() override;
@@ -686,11 +686,9 @@ private:
class StripeStreamInputStream : public orc::InputStream, public
ProfileCollector {
public:
StripeStreamInputStream(const std::string& file_name, io::FileReaderSPtr
inner_reader,
- OrcReader::Statistics* statistics, const
io::IOContext* io_ctx,
- RuntimeProfile* profile)
+ const io::IOContext* io_ctx, RuntimeProfile*
profile)
: _file_name(file_name),
_inner_reader(inner_reader),
- _statistics(statistics),
_io_ctx(io_ctx),
_profile(profile) {}
@@ -727,7 +725,6 @@ private:
const std::string& _file_name;
io::FileReaderSPtr _inner_reader;
// Owned by OrcReader
- OrcReader::Statistics* _statistics = nullptr;
const io::IOContext* _io_ctx = nullptr;
RuntimeProfile* _profile = nullptr;
};
@@ -735,21 +732,22 @@ private:
class ORCFileInputStream : public orc::InputStream, public ProfileCollector {
public:
ORCFileInputStream(const std::string& file_name, io::FileReaderSPtr
inner_reader,
- OrcReader::Statistics* statistics, const io::IOContext*
io_ctx,
- RuntimeProfile* profile, int64_t
orc_once_max_read_bytes,
- int64_t orc_max_merge_distance_bytes)
+ const io::IOContext* io_ctx, RuntimeProfile* profile,
+ int64_t orc_once_max_read_bytes, int64_t
orc_max_merge_distance_bytes)
: _file_name(file_name),
_inner_reader(inner_reader),
_file_reader(inner_reader),
+ _tracing_file_reader(io_ctx ?
std::make_shared<io::TracingFileReader>(
+ _file_reader,
io_ctx->file_reader_stats)
+ : _file_reader),
_orc_once_max_read_bytes(orc_once_max_read_bytes),
_orc_max_merge_distance_bytes(orc_max_merge_distance_bytes),
- _statistics(statistics),
_io_ctx(io_ctx),
_profile(profile) {}
~ORCFileInputStream() override {
- if (_file_reader != nullptr) {
- _file_reader->collect_profile_before_close();
+ if (_tracing_file_reader != nullptr) {
+ _tracing_file_reader->collect_profile_before_close();
}
for (const auto& stripe_stream : _stripe_streams) {
if (stripe_stream != nullptr) {
@@ -759,7 +757,7 @@ public:
_stripe_streams.clear();
}
- uint64_t getLength() const override { return _file_reader->size(); }
+ uint64_t getLength() const override { return _tracing_file_reader->size();
}
uint64_t getNaturalReadSize() const override { return
config::orc_natural_read_size_mb << 20; }
@@ -778,6 +776,8 @@ public:
io::FileReaderSPtr& get_inner_reader() { return _inner_reader; }
+ io::FileReaderSPtr& get_tracing_file_reader() { return
_tracing_file_reader; }
+
protected:
void _collect_profile_at_runtime() override {};
void _collect_profile_before_close() override;
@@ -796,8 +796,16 @@ private:
std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>&
streams);
const std::string& _file_name;
+
+ // _inner_reader is original file reader.
+ // _file_reader == RangeCacheFileReader used by tiny stripe case, if not
tiny stripe case,
+ // _file_reader == _inner_reader.
+ // _tracing_file_reader is tracing file reader with io context.
+ // If io_ctx is null, _tracing_file_reader will be the same as
_file_reader.
io::FileReaderSPtr _inner_reader;
io::FileReaderSPtr _file_reader;
+ io::FileReaderSPtr _tracing_file_reader;
+
bool _is_all_tiny_stripes = false;
int64_t _orc_once_max_read_bytes;
int64_t _orc_max_merge_distance_bytes;
@@ -805,7 +813,6 @@ private:
std::vector<std::shared_ptr<StripeStreamInputStream>> _stripe_streams;
// Owned by OrcReader
- OrcReader::Statistics* _statistics = nullptr;
const io::IOContext* _io_ctx = nullptr;
RuntimeProfile* _profile = nullptr;
};
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 316b19701c7..331e9696250 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -32,6 +32,7 @@
#include "io/fs/buffered_reader.h"
#include "io/fs/file_reader.h"
#include "io/fs/file_reader_writer_fwd.h"
+#include "io/fs/tracing_file_reader.h"
#include "parquet_pred_cmp.h"
#include "parquet_thrift_util.h"
#include "runtime/define_primitive_type.h"
@@ -118,6 +119,12 @@ ParquetReader::~ParquetReader() {
_close_internal();
}
+// for unit test
+void ParquetReader::set_file_reader(io::FileReaderSPtr file_reader) {
+ _file_reader = file_reader;
+ _tracing_file_reader = file_reader;
+}
+
void ParquetReader::_init_profile() {
if (_profile != nullptr) {
static const char* parquet_profile = "ParquetReader";
@@ -157,14 +164,8 @@ void ParquetReader::_init_profile() {
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PageIndexParseTime",
parquet_profile, 1);
_parquet_profile.row_group_filter_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "RowGroupFilterTime",
parquet_profile, 1);
-
- _parquet_profile.file_read_time = ADD_TIMER_WITH_LEVEL(_profile,
"FileReadTime", 1);
- _parquet_profile.file_read_calls =
- ADD_COUNTER_WITH_LEVEL(_profile, "FileReadCalls", TUnit::UNIT,
1);
_parquet_profile.file_meta_read_calls =
ADD_COUNTER_WITH_LEVEL(_profile, "FileMetaReadCalls",
TUnit::UNIT, 1);
- _parquet_profile.file_read_bytes =
- ADD_COUNTER_WITH_LEVEL(_profile, "FileReadBytes",
TUnit::BYTES, 1);
_parquet_profile.decompress_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecompressTime",
parquet_profile, 1);
_parquet_profile.decompress_cnt = ADD_CHILD_COUNTER_WITH_LEVEL(
@@ -215,18 +216,22 @@ Status ParquetReader::_open_file() {
_file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
_profile, _system_properties, _file_description,
reader_options,
io::DelegateReader::AccessMode::RANDOM, _io_ctx));
+ _tracing_file_reader = _io_ctx ?
std::make_shared<io::TracingFileReader>(
+ _file_reader,
_io_ctx->file_reader_stats)
+ : _file_reader;
}
if (_file_metadata == nullptr) {
SCOPED_RAW_TIMER(&_statistics.parse_footer_time);
- if (_file_reader->size() <= sizeof(PARQUET_VERSION_NUMBER)) {
+ if (_tracing_file_reader->size() <= sizeof(PARQUET_VERSION_NUMBER)) {
// Some system may generate parquet file with only 4 bytes: PAR1
// Should consider it as empty file.
return Status::EndOfFile("open file failed, empty parquet file {}
with size: {}",
- _scan_range.path, _file_reader->size());
+ _scan_range.path,
_tracing_file_reader->size());
}
size_t meta_size = 0;
if (_meta_cache == nullptr) {
- auto st = parse_thrift_footer(_file_reader, &_file_metadata,
&meta_size, _io_ctx);
+ auto st =
+ parse_thrift_footer(_tracing_file_reader, &_file_metadata,
&meta_size, _io_ctx);
// wrap it with unique ptr, so that it can be released finally.
_file_metadata_ptr.reset(_file_metadata);
RETURN_IF_ERROR(st);
@@ -235,7 +240,7 @@ Status ParquetReader::_open_file() {
// parse magic number & parse meta data
_column_statistics.meta_read_calls += 1;
} else {
- RETURN_IF_ERROR(_meta_cache->get_parquet_footer(_file_reader,
_io_ctx,
+
RETURN_IF_ERROR(_meta_cache->get_parquet_footer(_tracing_file_reader, _io_ctx,
_file_description.mtime, &meta_size,
&_meta_cache_handle));
_column_statistics.read_bytes += meta_size;
@@ -608,9 +613,12 @@ Status ParquetReader::_next_row_group_reader() {
_profile, _file_reader,
io_ranges)
: _file_reader;
}
- _current_group_reader.reset(new RowGroupReader(
- group_file_reader, _read_table_columns,
row_group_index.row_group_id, row_group, _ctz,
- _io_ctx, position_delete_ctx, _lazy_read_ctx, _state));
+ _current_group_reader.reset(
+ new RowGroupReader(_io_ctx ?
std::make_shared<io::TracingFileReader>(
+ group_file_reader,
_io_ctx->file_reader_stats)
+ : group_file_reader,
+ _read_table_columns,
row_group_index.row_group_id, row_group, _ctz,
+ _io_ctx, position_delete_ctx, _lazy_read_ctx,
_state));
_row_group_eof = false;
_current_group_reader->_table_info_node_ptr = _table_info_node_ptr;
@@ -748,6 +756,9 @@ Status ParquetReader::_process_page_index(const
tparquet::RowGroup& row_group,
std::function<void()> read_whole_row_group = [&]() {
candidate_row_ranges.emplace_back(0, row_group.num_rows);
_statistics.read_rows += row_group.num_rows;
+ if (_io_ctx) {
+ _io_ctx->file_reader_stats->read_rows += row_group.num_rows;
+ }
};
if ((!_enable_filter_by_min_max) || _lazy_read_ctx.has_complex_type ||
@@ -766,8 +777,8 @@ Status ParquetReader::_process_page_index(const
tparquet::RowGroup& row_group,
Slice result(col_index_buff.data(), page_index._column_index_size);
{
SCOPED_RAW_TIMER(&_statistics.read_page_index_time);
- RETURN_IF_ERROR(_file_reader->read_at(page_index._column_index_start,
result, &bytes_read,
- _io_ctx));
+
RETURN_IF_ERROR(_tracing_file_reader->read_at(page_index._column_index_start,
result,
+ &bytes_read, _io_ctx));
}
_column_statistics.read_bytes += bytes_read;
auto& schema_desc = _file_metadata->schema();
@@ -776,8 +787,8 @@ Status ParquetReader::_process_page_index(const
tparquet::RowGroup& row_group,
Slice res(off_index_buff.data(), page_index._offset_index_size);
{
SCOPED_RAW_TIMER(&_statistics.read_page_index_time);
- RETURN_IF_ERROR(
- _file_reader->read_at(page_index._offset_index_start, res,
&bytes_read, _io_ctx));
+
RETURN_IF_ERROR(_tracing_file_reader->read_at(page_index._offset_index_start,
res,
+ &bytes_read, _io_ctx));
}
_column_statistics.read_bytes += bytes_read;
// read twice: parse column index & parse offset index
@@ -856,6 +867,9 @@ Status ParquetReader::_process_page_index(const
tparquet::RowGroup& row_group,
read_rows += row_group.num_rows - skip_end;
}
_statistics.read_rows += read_rows;
+ if (_io_ctx) {
+ _io_ctx->file_reader_stats->read_rows += read_rows;
+ }
_statistics.filtered_page_rows += row_group.num_rows - read_rows;
return Status::OK();
}
@@ -1006,10 +1020,7 @@ void ParquetReader::_collect_profile() {
_column_statistics.parse_page_header_num);
COUNTER_UPDATE(_parquet_profile.predicate_filter_time,
_statistics.predicate_filter_time);
COUNTER_UPDATE(_parquet_profile.dict_filter_rewrite_time,
_statistics.dict_filter_rewrite_time);
- COUNTER_UPDATE(_parquet_profile.file_read_time,
_column_statistics.read_time);
- COUNTER_UPDATE(_parquet_profile.file_read_calls,
_column_statistics.read_calls);
COUNTER_UPDATE(_parquet_profile.file_meta_read_calls,
_column_statistics.meta_read_calls);
- COUNTER_UPDATE(_parquet_profile.file_read_bytes,
_column_statistics.read_bytes);
COUNTER_UPDATE(_parquet_profile.decompress_time,
_column_statistics.decompress_time);
COUNTER_UPDATE(_parquet_profile.decompress_cnt,
_column_statistics.decompress_cnt);
COUNTER_UPDATE(_parquet_profile.decode_header_time,
_column_statistics.decode_header_time);
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 814964833db..c560b1c3800 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -105,8 +105,8 @@ public:
io::IOContext* io_ctx, RuntimeState* state, bool
enable_lazy_mat = true);
~ParquetReader() override;
- // for test
- void set_file_reader(io::FileReaderSPtr file_reader) { _file_reader =
file_reader; }
+ // for unit test
+ void set_file_reader(io::FileReaderSPtr file_reader);
Status init_reader(
const std::vector<std::string>& all_column_names,
@@ -153,6 +153,8 @@ public:
Status get_file_metadata_schema(const FieldDescriptor** ptr);
+ bool count_read_rows() override { return true; }
+
protected:
void _collect_profile_before_close() override;
@@ -176,10 +178,7 @@ private:
RuntimeProfile::Counter* read_page_index_time = nullptr;
RuntimeProfile::Counter* parse_page_index_time = nullptr;
- RuntimeProfile::Counter* file_read_time = nullptr;
- RuntimeProfile::Counter* file_read_calls = nullptr;
RuntimeProfile::Counter* file_meta_read_calls = nullptr;
- RuntimeProfile::Counter* file_read_bytes = nullptr;
RuntimeProfile::Counter* decompress_time = nullptr;
RuntimeProfile::Counter* decompress_cnt = nullptr;
RuntimeProfile::Counter* decode_header_time = nullptr;
@@ -244,7 +243,12 @@ private:
FileMetaData* _file_metadata = nullptr;
const tparquet::FileMetaData* _t_metadata = nullptr;
+ // _tracing_file_reader wraps _file_reader.
+ // _file_reader is original file reader.
+ // _tracing_file_reader is tracing file reader with io context.
+ // If io_ctx is null, _tracing_file_reader will be the same as file_reader.
io::FileReaderSPtr _file_reader = nullptr;
+ io::FileReaderSPtr _tracing_file_reader = nullptr;
std::unique_ptr<RowGroupReader> _current_group_reader;
// read to the end of current reader
bool _row_group_eof = true;
diff --git a/be/src/vec/exec/format/table/table_format_reader.h
b/be/src/vec/exec/format/table/table_format_reader.h
index fbba6a51033..1f79de54ed4 100644
--- a/be/src/vec/exec/format/table/table_format_reader.h
+++ b/be/src/vec/exec/format/table/table_format_reader.h
@@ -104,6 +104,8 @@ public:
virtual Status init_row_filters() = 0;
+ bool count_read_rows() override { return
_file_format_reader->count_read_rows(); }
+
protected:
std::string _table_format; // hudi, iceberg,
paimon
std::unique_ptr<GenericReader> _file_format_reader; // parquet, orc
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index be3b5d3d61b..e65a761a182 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -556,9 +556,18 @@ void NewOlapScanner::update_realtime_counters() {
// In case of no cache, we still need to update the IO stats. uncompressed
bytes read == local + remote
if (stats.file_cache_stats.bytes_read_from_local == 0 &&
stats.file_cache_stats.bytes_read_from_remote == 0) {
+ if (_query_statistics) {
+
_query_statistics->add_scan_bytes_from_local_storage(stats.compressed_bytes_read);
+ }
DorisMetrics::instance()->query_scan_bytes_from_local->increment(
stats.compressed_bytes_read);
} else {
+ if (_query_statistics) {
+ _query_statistics->add_scan_bytes_from_local_storage(
+ stats.file_cache_stats.bytes_read_from_local);
+ _query_statistics->add_scan_bytes_from_remote_storage(
+ stats.file_cache_stats.bytes_read_from_remote);
+ }
DorisMetrics::instance()->query_scan_bytes_from_local->increment(
stats.file_cache_stats.bytes_read_from_local);
DorisMetrics::instance()->query_scan_bytes_from_remote->increment(
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 0cba6b8a0c2..b744ae0d174 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -38,6 +38,7 @@
#include "common/logging.h"
#include "common/status.h"
#include "io/cache/block_file_cache_profile.h"
+#include "io/fs/tracing_file_reader.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "runtime/types.h"
@@ -146,14 +147,24 @@ Status VFileScanner::prepare(RuntimeState* state, const
VExprContextSPtrs& conju
"NotFoundFileNum",
TUnit::UNIT, 1);
_file_counter =
ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
"FileNumber", TUnit::UNIT, 1);
+
+ _file_read_bytes_counter =
ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
+ "FileReadBytes",
TUnit::BYTES, 1);
+ _file_read_calls_counter =
ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
+ "FileReadCalls",
TUnit::UNIT, 1);
+ _file_read_time_counter =
+ ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
"FileReadTime", 1);
+
_runtime_filter_partition_pruned_range_counter =
ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
"RuntimeFilterPartitionPrunedRangeNum",
TUnit::UNIT, 1);
_file_cache_statistics.reset(new io::FileCacheStatistics());
+ _file_reader_stats.reset(new io::FileReaderStats());
+
_io_ctx.reset(new io::IOContext());
_io_ctx->file_cache_stats = _file_cache_statistics.get();
- _io_ctx->query_id = &_state->query_id();
+ _io_ctx->file_reader_stats = _file_reader_stats.get();
if (_is_load) {
_src_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
@@ -435,6 +446,9 @@ Status VFileScanner::_get_block_wrapped(RuntimeState*
state, Block* block, bool*
// use read_rows instead of _src_block_ptr->rows(), because the first
column of _src_block_ptr
// may not be filled after calling `get_next_block()`, so
_src_block_ptr->rows() may return wrong result.
if (read_rows > 0) {
+ if ((!_cur_reader->count_read_rows()) && _io_ctx) {
+ _io_ctx->file_reader_stats->read_rows += read_rows;
+ }
// If the push_down_agg_type is COUNT, no need to do the rest,
// because we only save a number in block.
if (_get_push_down_agg_type() != TPushAggOp::type::COUNT) {
@@ -1478,6 +1492,49 @@ void VFileScanner::try_stop() {
}
}
+void VFileScanner::update_realtime_counters() {
+ pipeline::FileScanLocalState* local_state =
+ static_cast<pipeline::FileScanLocalState*>(_local_state);
+
+ COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes);
+ COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows);
+
+ if (_query_statistics) {
+ _query_statistics->add_scan_rows(_file_reader_stats->read_rows);
+ _query_statistics->add_scan_bytes(_file_reader_stats->read_bytes);
+ }
+
+ if (_file_cache_statistics->bytes_read_from_local == 0 &&
+ _file_cache_statistics->bytes_read_from_remote == 0) {
+ if (_query_statistics) {
+
_query_statistics->add_scan_bytes_from_remote_storage(_file_reader_stats->read_bytes);
+ }
+ DorisMetrics::instance()->query_scan_bytes_from_local->increment(
+ _file_reader_stats->read_bytes);
+ } else {
+ if (_query_statistics) {
+ _query_statistics->add_scan_bytes_from_local_storage(
+ _file_cache_statistics->bytes_read_from_local);
+ _query_statistics->add_scan_bytes_from_remote_storage(
+ _file_cache_statistics->bytes_read_from_remote);
+ }
+ DorisMetrics::instance()->query_scan_bytes_from_local->increment(
+ _file_cache_statistics->bytes_read_from_local);
+ DorisMetrics::instance()->query_scan_bytes_from_remote->increment(
+ _file_cache_statistics->bytes_read_from_remote);
+ }
+
+ COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
+
+
DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes);
+
DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows);
+
+ _file_reader_stats->read_bytes = 0;
+ _file_reader_stats->read_rows = 0;
+ _file_cache_statistics->bytes_read_from_local = 0;
+ _file_cache_statistics->bytes_read_from_remote = 0;
+}
+
void VFileScanner::_collect_profile_before_close() {
VScanner::_collect_profile_before_close();
if (config::enable_file_cache && _state->query_options().enable_file_cache
&&
@@ -1489,6 +1546,18 @@ void VFileScanner::_collect_profile_before_close() {
if (_cur_reader != nullptr) {
_cur_reader->collect_profile_before_close();
}
+
+ pipeline::FileScanLocalState* local_state =
+ static_cast<pipeline::FileScanLocalState*>(_local_state);
+ COUNTER_UPDATE(local_state->_scan_bytes, _file_reader_stats->read_bytes);
+ COUNTER_UPDATE(local_state->_scan_rows, _file_reader_stats->read_rows);
+
+ COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
+ COUNTER_UPDATE(_file_read_calls_counter, _file_reader_stats->read_calls);
+ COUNTER_UPDATE(_file_read_time_counter, _file_reader_stats->read_time_ns);
+
+
DorisMetrics::instance()->query_scan_bytes->increment(_file_reader_stats->read_bytes);
+
DorisMetrics::instance()->query_scan_rows->increment(_file_reader_stats->read_rows);
}
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.h
b/be/src/vec/exec/scan/vfile_scanner.h
index d2e6a220a15..705f01962dc 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -81,6 +81,8 @@ public:
std::string get_current_scan_range_name() override { return
_current_range_path; }
+ void update_realtime_counters() override;
+
protected:
Status _get_block_impl(RuntimeState* state, Block* block, bool* eof)
override;
@@ -165,6 +167,7 @@ protected:
Block _runtime_filter_partition_prune_block;
std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics;
+ std::unique_ptr<io::FileReaderStats> _file_reader_stats;
std::unique_ptr<io::IOContext> _io_ctx;
std::unordered_map<std::string, std::tuple<std::string, const
SlotDescriptor*>>
@@ -186,6 +189,9 @@ private:
RuntimeProfile::Counter* _empty_file_counter = nullptr;
RuntimeProfile::Counter* _not_found_file_counter = nullptr;
RuntimeProfile::Counter* _file_counter = nullptr;
+ RuntimeProfile::Counter* _file_read_bytes_counter = nullptr;
+ RuntimeProfile::Counter* _file_read_calls_counter = nullptr;
+ RuntimeProfile::Counter* _file_read_time_counter = nullptr;
RuntimeProfile::Counter* _runtime_filter_partition_pruned_range_counter =
nullptr;
const std::unordered_map<std::string, int>* _col_name_to_slot_id = nullptr;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]