This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 3940c799ce5 branch-4.0: [fix](profile) sort out parquet reader profile
(#58895) (#59789)
3940c799ce5 is described below
commit 3940c799ce517b5d6b392a0b60162763615dfc15
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Tue Jan 13 10:07:39 2026 +0800
branch-4.0: [fix](profile) sort out parquet reader profile (#58895) (#59789)
bp #58895
---
be/src/io/fs/buffered_reader.cpp | 3 -
be/src/io/fs/buffered_reader.h | 18 ---
be/src/io/fs/tracing_file_reader.h | 1 +
.../parquet/vparquet_column_chunk_reader.cpp | 14 +-
.../format/parquet/vparquet_column_chunk_reader.h | 17 ++-
.../exec/format/parquet/vparquet_column_reader.cpp | 6 +-
.../exec/format/parquet/vparquet_column_reader.h | 82 +++++-------
.../exec/format/parquet/vparquet_group_reader.cpp | 6 +-
.../exec/format/parquet/vparquet_group_reader.h | 2 +-
.../exec/format/parquet/vparquet_page_reader.cpp | 9 +-
.../vec/exec/format/parquet/vparquet_page_reader.h | 9 +-
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 146 +++++++++++----------
be/src/vec/exec/format/parquet/vparquet_reader.h | 16 +--
be/src/vec/exec/scan/file_scanner.cpp | 18 ++-
be/src/vec/exec/scan/file_scanner.h | 2 +
15 files changed, 173 insertions(+), 176 deletions(-)
diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp
index cae08d28417..62774160555 100644
--- a/be/src/io/fs/buffered_reader.cpp
+++ b/be/src/io/fs/buffered_reader.cpp
@@ -819,12 +819,10 @@ Status BufferedFileStreamReader::read_bytes(const
uint8_t** buf, uint64_t offset
int64_t buf_remaining = _buf_end_offset - _buf_start_offset;
int64_t to_read = std::min(_buf_size - buf_remaining, _file_end_offset -
_buf_end_offset);
int64_t has_read = 0;
- SCOPED_RAW_TIMER(&_statistics.read_time);
while (has_read < to_read) {
size_t loop_read = 0;
Slice result(_buf.get() + buf_remaining + has_read, to_read -
has_read);
RETURN_IF_ERROR(_file->read_at(_buf_end_offset + has_read, result,
&loop_read, io_ctx));
- _statistics.read_calls++;
if (loop_read == 0) {
break;
}
@@ -833,7 +831,6 @@ Status BufferedFileStreamReader::read_bytes(const uint8_t**
buf, uint64_t offset
if (has_read != to_read) {
return Status::Corruption("Try to read {} bytes, but received {}
bytes", to_read, has_read);
}
- _statistics.read_bytes += to_read;
_buf_end_offset += to_read;
*buf = _buf.get();
return Status::OK();
diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index 6bcf634aef3..5fe07176235 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -225,7 +225,6 @@ public:
int64_t merged_io = 0;
int64_t request_bytes = 0;
int64_t merged_bytes = 0;
- int64_t apply_bytes = 0;
};
struct RangeCachedData {
@@ -299,9 +298,6 @@ public:
_merged_read_slice_size = READ_SLICE_SIZE;
}
- for (const PrefetchRange& range : _random_access_ranges) {
- _statistics.apply_bytes += range.end_offset - range.start_offset;
- }
if (_profile != nullptr) {
const char* random_profile = "MergedSmallIO";
ADD_TIMER_WITH_LEVEL(_profile, random_profile, 1);
@@ -315,8 +311,6 @@ public:
random_profile, 1);
_merged_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile,
"MergedBytes", TUnit::BYTES,
random_profile, 1);
- _apply_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile,
"ApplyBytes", TUnit::BYTES,
- random_profile, 1);
}
}
@@ -359,7 +353,6 @@ protected:
COUNTER_UPDATE(_merged_io, _statistics.merged_io);
COUNTER_UPDATE(_request_bytes, _statistics.request_bytes);
COUNTER_UPDATE(_merged_bytes, _statistics.merged_bytes);
- COUNTER_UPDATE(_apply_bytes, _statistics.apply_bytes);
if (_reader != nullptr) {
_reader->collect_profile_before_close();
}
@@ -373,7 +366,6 @@ private:
RuntimeProfile::Counter* _merged_io = nullptr;
RuntimeProfile::Counter* _request_bytes = nullptr;
RuntimeProfile::Counter* _merged_bytes = nullptr;
- RuntimeProfile::Counter* _apply_bytes = nullptr;
int _search_read_range(size_t start_offset, size_t end_offset);
void _clean_cached_data(RangeCachedData& cached_data);
@@ -619,12 +611,6 @@ private:
*/
class BufferedStreamReader {
public:
- struct Statistics {
- int64_t read_time = 0;
- int64_t read_calls = 0;
- int64_t read_bytes = 0;
- };
-
/**
* Return the address of underlying buffer that locates the start of data
between [offset, offset + bytes_to_read)
* @param buf the buffer address to save the start address of data
@@ -637,13 +623,9 @@ public:
* Save the data address to slice.data, and the slice.size is the bytes to
read.
*/
virtual Status read_bytes(Slice& slice, uint64_t offset, const IOContext*
io_ctx) = 0;
- Statistics& statistics() { return _statistics; }
virtual ~BufferedStreamReader() = default;
// return the file path
virtual std::string path() = 0;
-
-protected:
- Statistics _statistics;
};
class BufferedFileStreamReader : public BufferedStreamReader, public
ProfileCollector {
diff --git a/be/src/io/fs/tracing_file_reader.h
b/be/src/io/fs/tracing_file_reader.h
index 84eb3dfc8fb..39b70dfbb63 100644
--- a/be/src/io/fs/tracing_file_reader.h
+++ b/be/src/io/fs/tracing_file_reader.h
@@ -48,6 +48,7 @@ public:
void _collect_profile_before_close() override { return
_inner->collect_profile_before_close(); }
FileReaderStats* stats() const { return _stats; }
+ doris::io::FileReaderSPtr inner_reader() { return _inner; }
private:
doris::io::FileReaderSPtr _inner;
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
index 74398e6da4a..621d8a2c505 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
@@ -189,8 +189,8 @@ Status ColumnChunkReader<IN_COLLECTION,
OFFSET_INDEX>::load_page_data() {
// check decompressed buffer size
_reserve_decompress_buf(uncompressed_size);
_page_data = Slice(_decompress_buf.get(), uncompressed_size);
- SCOPED_RAW_TIMER(&_statistics.decompress_time);
- _statistics.decompress_cnt++;
+ SCOPED_RAW_TIMER(&_chunk_statistics.decompress_time);
+ _chunk_statistics.decompress_cnt++;
RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data,
&_page_data));
} else {
// Don't need decompress
@@ -205,7 +205,7 @@ Status ColumnChunkReader<IN_COLLECTION,
OFFSET_INDEX>::load_page_data() {
// Initialize repetition level and definition level. Skip when level = 0,
which means required field.
if (_max_rep_level > 0) {
- SCOPED_RAW_TIMER(&_statistics.decode_level_time);
+ SCOPED_RAW_TIMER(&_chunk_statistics.decode_level_time);
if (header->__isset.data_page_header_v2) {
RETURN_IF_ERROR(_rep_level_decoder.init_v2(_v2_rep_levels,
_max_rep_level,
_remaining_rep_nums));
@@ -216,7 +216,7 @@ Status ColumnChunkReader<IN_COLLECTION,
OFFSET_INDEX>::load_page_data() {
}
}
if (_max_def_level > 0) {
- SCOPED_RAW_TIMER(&_statistics.decode_level_time);
+ SCOPED_RAW_TIMER(&_chunk_statistics.decode_level_time);
if (header->__isset.data_page_header_v2) {
RETURN_IF_ERROR(_def_level_decoder.init_v2(_v2_def_levels,
_max_def_level,
_remaining_def_nums));
@@ -256,7 +256,7 @@ Status ColumnChunkReader<IN_COLLECTION,
OFFSET_INDEX>::_decode_dict_page() {
const tparquet::PageHeader* header = nullptr;
RETURN_IF_ERROR(_page_reader->get_page_header(header));
DCHECK_EQ(tparquet::PageType::DICTIONARY_PAGE, header->type);
- SCOPED_RAW_TIMER(&_statistics.decode_dict_time);
+ SCOPED_RAW_TIMER(&_chunk_statistics.decode_dict_time);
// Using the PLAIN_DICTIONARY enum value is deprecated in the Parquet 2.0
specification.
// Prefer using RLE_DICTIONARY in a data page and PLAIN in a dictionary
page for Parquet 2.0+ files.
@@ -315,7 +315,7 @@ Status ColumnChunkReader<IN_COLLECTION,
OFFSET_INDEX>::skip_values(size_t num_va
}
_remaining_num_values -= num_values;
if (skip_data) {
- SCOPED_RAW_TIMER(&_statistics.decode_value_time);
+ SCOPED_RAW_TIMER(&_chunk_statistics.decode_value_time);
return _page_decoder->skip_values(num_values);
} else {
return Status::OK();
@@ -329,7 +329,7 @@ Status ColumnChunkReader<IN_COLLECTION,
OFFSET_INDEX>::decode_values(
if (select_vector.num_values() == 0) {
return Status::OK();
}
- SCOPED_RAW_TIMER(&_statistics.decode_value_time);
+ SCOPED_RAW_TIMER(&_chunk_statistics.decode_value_time);
if (UNLIKELY((doris_column->is_column_dictionary() || is_dict_filter) &&
!_has_dict)) {
return Status::IOError("Not dictionary coded");
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
index 4022eac702b..1270e5e37fc 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
@@ -60,6 +60,7 @@ struct ColumnChunkReaderStatistics {
int64_t decode_level_time = 0;
int64_t skip_page_header_num = 0;
int64_t parse_page_header_num = 0;
+ int64_t read_page_header_time = 0;
};
/**
@@ -146,11 +147,15 @@ public:
// Get page decoder
Decoder* get_page_decoder() { return _page_decoder; }
- ColumnChunkReaderStatistics& statistics() {
- _statistics.decode_header_time =
_page_reader->statistics().decode_header_time;
- _statistics.skip_page_header_num =
_page_reader->statistics().skip_page_header_num;
- _statistics.parse_page_header_num =
_page_reader->statistics().parse_page_header_num;
- return _statistics;
+ ColumnChunkReaderStatistics& chunk_statistics() {
+ _chunk_statistics.decode_header_time =
_page_reader->page_statistics().decode_header_time;
+ _chunk_statistics.skip_page_header_num =
+ _page_reader->page_statistics().skip_page_header_num;
+ _chunk_statistics.parse_page_header_num =
+ _page_reader->page_statistics().parse_page_header_num;
+ _chunk_statistics.read_page_header_time =
+ _page_reader->page_statistics().read_page_header_time;
+ return _chunk_statistics;
}
Status read_dict_values_to_column(MutableColumnPtr& doris_column) {
@@ -238,7 +243,7 @@ private:
// Map: encoding -> Decoder
// Plain or Dictionary encoding. If the dictionary grows too big, the
encoding will fall back to the plain encoding
std::unordered_map<int, std::unique_ptr<Decoder>> _decoders;
- ColumnChunkReaderStatistics _statistics;
+ ColumnChunkReaderStatistics _chunk_statistics;
};
#include "common/compile_check_end.h"
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index 07b07255359..e24ec85b2e1 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -25,6 +25,7 @@
#include <algorithm>
#include <utility>
+#include "io/fs/tracing_file_reader.h"
#include "runtime/define_primitive_type.h"
#include "schema_desc.h"
#include "util/runtime_profile.h"
@@ -248,7 +249,10 @@ Status ScalarColumnReader<IN_COLLECTION,
OFFSET_INDEX>::init(io::FileReaderSPtr
:
chunk_meta.data_page_offset;
size_t chunk_len = chunk_meta.total_compressed_size;
size_t prefetch_buffer_size = std::min(chunk_len, max_buf_size);
- if (typeid_cast<io::MergeRangeFileReader*>(file.get())) {
+ if ((typeid_cast<doris::io::TracingFileReader*>(file.get()) &&
+ typeid_cast<io::MergeRangeFileReader*>(
+
((doris::io::TracingFileReader*)(file.get()))->inner_reader().get())) ||
+ typeid_cast<io::MergeRangeFileReader*>(file.get())) {
// turn off prefetch data when using MergeRangeFileReader
prefetch_buffer_size = 0;
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index 0e11f6f55fa..62ae4eb5fbe 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -53,12 +53,9 @@ using ColumnString = ColumnStr<UInt32>;
class ParquetColumnReader {
public:
- struct Statistics {
- Statistics()
- : read_time(0),
- read_calls(0),
- page_index_read_calls(0),
- read_bytes(0),
+ struct ColumnStatistics {
+ ColumnStatistics()
+ : page_index_read_calls(0),
decompress_time(0),
decompress_cnt(0),
decode_header_time(0),
@@ -67,14 +64,11 @@ public:
decode_level_time(0),
decode_null_map_time(0),
skip_page_header_num(0),
- parse_page_header_num(0) {}
-
- Statistics(io::BufferedStreamReader::Statistics& fs,
ColumnChunkReaderStatistics& cs,
- int64_t null_map_time)
- : read_time(fs.read_time),
- read_calls(fs.read_calls),
- page_index_read_calls(0),
- read_bytes(fs.read_bytes),
+ parse_page_header_num(0),
+ read_page_header_time(0) {}
+
+ ColumnStatistics(ColumnChunkReaderStatistics& cs, int64_t
null_map_time)
+ : page_index_read_calls(0),
decompress_time(cs.decompress_time),
decompress_cnt(cs.decompress_cnt),
decode_header_time(cs.decode_header_time),
@@ -83,12 +77,10 @@ public:
decode_level_time(cs.decode_level_time),
decode_null_map_time(null_map_time),
skip_page_header_num(cs.skip_page_header_num),
- parse_page_header_num(cs.parse_page_header_num) {}
+ parse_page_header_num(cs.parse_page_header_num),
+ read_page_header_time(cs.read_page_header_time) {}
- int64_t read_time;
- int64_t read_calls;
int64_t page_index_read_calls;
- int64_t read_bytes;
int64_t decompress_time;
int64_t decompress_cnt;
int64_t decode_header_time;
@@ -98,21 +90,20 @@ public:
int64_t decode_null_map_time;
int64_t skip_page_header_num;
int64_t parse_page_header_num;
-
- void merge(Statistics& statistics) {
- read_time += statistics.read_time;
- read_calls += statistics.read_calls;
- read_bytes += statistics.read_bytes;
- page_index_read_calls += statistics.page_index_read_calls;
- decompress_time += statistics.decompress_time;
- decompress_cnt += statistics.decompress_cnt;
- decode_header_time += statistics.decode_header_time;
- decode_value_time += statistics.decode_value_time;
- decode_dict_time += statistics.decode_dict_time;
- decode_level_time += statistics.decode_level_time;
- decode_null_map_time += statistics.decode_null_map_time;
- skip_page_header_num += statistics.skip_page_header_num;
- parse_page_header_num += statistics.parse_page_header_num;
+ int64_t read_page_header_time;
+
+ void merge(ColumnStatistics& col_statistics) {
+ page_index_read_calls += col_statistics.page_index_read_calls;
+ decompress_time += col_statistics.decompress_time;
+ decompress_cnt += col_statistics.decompress_cnt;
+ decode_header_time += col_statistics.decode_header_time;
+ decode_value_time += col_statistics.decode_value_time;
+ decode_dict_time += col_statistics.decode_dict_time;
+ decode_level_time += col_statistics.decode_level_time;
+ decode_null_map_time += col_statistics.decode_null_map_time;
+ skip_page_header_num += col_statistics.skip_page_header_num;
+ parse_page_header_num += col_statistics.parse_page_header_num;
+ read_page_header_time += col_statistics.read_page_header_time;
}
};
@@ -145,7 +136,7 @@ public:
const std::set<uint64_t>& filter_column_ids = {});
virtual const std::vector<level_t>& get_rep_level() const = 0;
virtual const std::vector<level_t>& get_def_level() const = 0;
- virtual Statistics statistics() = 0;
+ virtual ColumnStatistics column_statistics() = 0;
virtual void close() = 0;
virtual void reset_filter_map_index() = 0;
@@ -188,9 +179,8 @@ public:
MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32*
dict_column) override;
const std::vector<level_t>& get_rep_level() const override { return
_rep_levels; }
const std::vector<level_t>& get_def_level() const override { return
_def_levels; }
- Statistics statistics() override {
- return Statistics(_stream_reader->statistics(),
_chunk_reader->statistics(),
- _decode_null_map_time);
+ ColumnStatistics column_statistics() override {
+ return ColumnStatistics(_chunk_reader->chunk_statistics(),
_decode_null_map_time);
}
void close() override {}
@@ -304,7 +294,7 @@ public:
const std::vector<level_t>& get_def_level() const override {
return _element_reader->get_def_level();
}
- Statistics statistics() override { return _element_reader->statistics(); }
+ ColumnStatistics column_statistics() override { return
_element_reader->column_statistics(); }
void close() override {}
void reset_filter_map_index() override {
_element_reader->reset_filter_map_index(); }
@@ -335,9 +325,9 @@ public:
return _key_reader->get_def_level();
}
- Statistics statistics() override {
- Statistics kst = _key_reader->statistics();
- Statistics vst = _value_reader->statistics();
+ ColumnStatistics column_statistics() override {
+ ColumnStatistics kst = _key_reader->column_statistics();
+ ColumnStatistics vst = _value_reader->column_statistics();
kst.merge(vst);
return kst;
}
@@ -392,12 +382,12 @@ public:
return _child_readers.begin()->second->get_def_level();
}
- Statistics statistics() override {
- Statistics st;
+ ColumnStatistics column_statistics() override {
+ ColumnStatistics st;
for (const auto& column_name : _read_column_names) {
auto reader = _child_readers.find(column_name);
if (reader != _child_readers.end()) {
- Statistics cst = reader->second->statistics();
+ ColumnStatistics cst = reader->second->column_statistics();
st.merge(cst);
}
}
@@ -490,8 +480,8 @@ public:
}
// Implement required pure virtual methods from base class
- Statistics statistics() override {
- return Statistics(); // Return empty statistics
+ ColumnStatistics column_statistics() override {
+ return ColumnStatistics(); // Return empty statistics
}
void close() override {
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index cd815e0091b..b9f5e983aad 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -1125,10 +1125,10 @@ void
RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) {
}
}
-ParquetColumnReader::Statistics RowGroupReader::statistics() {
- ParquetColumnReader::Statistics st;
+ParquetColumnReader::ColumnStatistics
RowGroupReader::merged_column_statistics() {
+ ParquetColumnReader::ColumnStatistics st;
for (auto& reader : _column_readers) {
- auto ost = reader.second->statistics();
+ auto ost = reader.second->column_statistics();
st.merge(ost);
}
return st;
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index b3b1123f82e..8f81e633146 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -173,7 +173,7 @@ public:
int64_t predicate_filter_time() const { return _predicate_filter_time; }
int64_t dict_filter_rewrite_time() const { return
_dict_filter_rewrite_time; }
- ParquetColumnReader::Statistics statistics();
+ ParquetColumnReader::ColumnStatistics merged_column_statistics();
void set_remaining_rows(int64_t rows) { _remaining_rows = rows; }
int64_t get_remaining_rows() { return _remaining_rows; }
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
index 3734dc217f5..3b6d7fdcb9b 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
@@ -87,9 +87,12 @@ Status PageReader<IN_COLLECTION,
OFFSET_INDEX>::parse_page_header() {
return Status::EndOfFile("stop");
}
header_size = std::min(header_size, max_size);
- RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset,
header_size, _io_ctx));
+ {
+ SCOPED_RAW_TIMER(&_page_statistics.read_page_header_time);
+ RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset,
header_size, _io_ctx));
+ }
real_header_size = cast_set<uint32_t>(header_size);
- SCOPED_RAW_TIMER(&_statistics.decode_header_time);
+ SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
auto st =
deserialize_thrift_msg(page_header_buf, &real_header_size,
true, &_cur_page_header);
if (st.ok()) {
@@ -112,7 +115,7 @@ Status PageReader<IN_COLLECTION,
OFFSET_INDEX>::parse_page_header() {
}
}
- _statistics.parse_page_header_num++;
+ _page_statistics.parse_page_header_num++;
_offset += real_header_size;
_next_header_offset = _offset + _cur_page_header.compressed_page_size;
_state = HEADER_PARSED;
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.h
b/be/src/vec/exec/format/parquet/vparquet_page_reader.h
index c33a7ca8cdb..9246819d59c 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.h
@@ -53,10 +53,11 @@ namespace doris::vectorized {
template <bool IN_COLLECTION, bool OFFSET_INDEX>
class PageReader {
public:
- struct Statistics {
+ struct PageStatistics {
int64_t decode_header_time = 0;
int64_t skip_page_header_num = 0;
int64_t parse_page_header_num = 0;
+ int64_t read_page_header_time = 0;
};
PageReader(io::BufferedStreamReader* reader, io::IOContext* io_ctx,
uint64_t offset,
@@ -82,7 +83,7 @@ public:
Status parse_page_header();
Status next_page() {
- _statistics.skip_page_header_num += _state == INITIALIZED;
+ _page_statistics.skip_page_header_num += _state == INITIALIZED;
if constexpr (OFFSET_INDEX) {
_page_index++;
_start_row =
_offset_index->page_locations[_page_index].first_row_index;
@@ -132,7 +133,7 @@ public:
Status get_page_data(Slice& slice);
- Statistics& statistics() { return _statistics; }
+ PageStatistics& page_statistics() { return _page_statistics; }
bool is_header_v2() { return _cur_page_header.__isset.data_page_header_v2;
}
@@ -143,7 +144,7 @@ public:
private:
enum PageReaderState { INITIALIZED, HEADER_PARSED, DATA_LOADED };
PageReaderState _state = INITIALIZED;
- Statistics _statistics;
+ PageStatistics _page_statistics;
io::BufferedStreamReader* _reader = nullptr;
io::IOContext* _io_ctx = nullptr;
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 3eb04608f73..0b49a7efce3 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -147,13 +147,15 @@ void ParquetReader::_init_profile() {
ADD_TIMER_WITH_LEVEL(_profile, parquet_profile, 1);
_parquet_profile.filtered_row_groups = ADD_CHILD_COUNTER_WITH_LEVEL(
- _profile, "FilteredGroups", TUnit::UNIT, parquet_profile, 1);
+ _profile, "RowGroupsFiltered", TUnit::UNIT, parquet_profile,
1);
_parquet_profile.filtered_row_groups_by_min_max =
ADD_CHILD_COUNTER_WITH_LEVEL(
- _profile, "FilteredGroupsByMinMax", TUnit::UNIT,
parquet_profile, 1);
+ _profile, "RowGroupsFilteredByMinMax", TUnit::UNIT,
parquet_profile, 1);
_parquet_profile.filtered_row_groups_by_bloom_filter =
ADD_CHILD_COUNTER_WITH_LEVEL(
- _profile, "FilteredGroupsByBloomFilter", TUnit::UNIT,
parquet_profile, 1);
+ _profile, "RowGroupsFilteredByBloomFilter", TUnit::UNIT,
parquet_profile, 1);
_parquet_profile.to_read_row_groups = ADD_CHILD_COUNTER_WITH_LEVEL(
- _profile, "ReadGroups", TUnit::UNIT, parquet_profile, 1);
+ _profile, "RowGroupsReadNum", TUnit::UNIT, parquet_profile, 1);
+ _parquet_profile.total_row_groups = ADD_CHILD_COUNTER_WITH_LEVEL(
+ _profile, "RowGroupsTotalNum", TUnit::UNIT, parquet_profile,
1);
_parquet_profile.filtered_group_rows = ADD_CHILD_COUNTER_WITH_LEVEL(
_profile, "FilteredRowsByGroup", TUnit::UNIT, parquet_profile,
1);
_parquet_profile.filtered_page_rows = ADD_CHILD_COUNTER_WITH_LEVEL(
@@ -164,16 +166,14 @@ void ParquetReader::_init_profile() {
_profile, "FilteredBytes", TUnit::BYTES, parquet_profile, 1);
_parquet_profile.raw_rows_read = ADD_CHILD_COUNTER_WITH_LEVEL(
_profile, "RawRowsRead", TUnit::UNIT, parquet_profile, 1);
- _parquet_profile.to_read_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(
- _profile, "ReadBytes", TUnit::BYTES, parquet_profile, 1);
_parquet_profile.column_read_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ColumnReadTime",
parquet_profile, 1);
_parquet_profile.parse_meta_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ParseMetaTime",
parquet_profile, 1);
_parquet_profile.parse_footer_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ParseFooterTime",
parquet_profile, 1);
- _parquet_profile.open_file_time =
- ADD_CHILD_TIMER_WITH_LEVEL(_profile, "FileOpenTime",
parquet_profile, 1);
+ _parquet_profile.file_reader_create_time =
+ ADD_CHILD_TIMER_WITH_LEVEL(_profile, "FileReaderCreateTime",
parquet_profile, 1);
_parquet_profile.open_file_num =
ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "FileNum", TUnit::UNIT,
parquet_profile, 1);
_parquet_profile.page_index_read_calls =
@@ -195,7 +195,9 @@ void ParquetReader::_init_profile() {
_parquet_profile.decompress_cnt = ADD_CHILD_COUNTER_WITH_LEVEL(
_profile, "DecompressCount", TUnit::UNIT, parquet_profile, 1);
_parquet_profile.decode_header_time =
- ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecodeHeaderTime",
parquet_profile, 1);
+ ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PageHeaderDecodeTime",
parquet_profile, 1);
+ _parquet_profile.read_page_header_time =
+ ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PageHeaderReadTime",
parquet_profile, 1);
_parquet_profile.decode_value_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecodeValueTime",
parquet_profile, 1);
_parquet_profile.decode_dict_time =
@@ -233,8 +235,8 @@ Status ParquetReader::_open_file() {
return Status::EndOfFile("stop");
}
if (_file_reader == nullptr) {
- SCOPED_RAW_TIMER(&_statistics.open_file_time);
- ++_statistics.open_file_num;
+ SCOPED_RAW_TIMER(&_reader_statistics.file_reader_create_time);
+ ++_reader_statistics.open_file_num;
_file_description.mtime =
_scan_range.__isset.modification_time ?
_scan_range.modification_time : 0;
io::FileReaderOptions reader_options =
@@ -248,7 +250,7 @@ Status ParquetReader::_open_file() {
}
if (_file_metadata == nullptr) {
- SCOPED_RAW_TIMER(&_statistics.parse_footer_time);
+ SCOPED_RAW_TIMER(&_reader_statistics.parse_footer_time);
if (_tracing_file_reader->size() <= sizeof(PARQUET_VERSION_NUMBER)) {
// Some system may generate parquet file with only 4 bytes: PAR1
// Should consider it as empty file.
@@ -264,9 +266,8 @@ Status ParquetReader::_open_file() {
RETURN_IF_ERROR(parse_thrift_footer(_tracing_file_reader,
&_file_metadata_ptr,
&meta_size, _io_ctx,
enable_mapping_varbinary));
_file_metadata = _file_metadata_ptr.get();
- _column_statistics.read_bytes += meta_size;
// parse magic number & parse meta data
- _statistics.file_footer_read_calls += 1;
+ _reader_statistics.file_footer_read_calls += 1;
} else {
const auto& file_meta_cache_key =
FileMetaCache::get_key(_tracing_file_reader,
_file_description);
@@ -277,10 +278,9 @@ Status ParquetReader::_open_file() {
_meta_cache->insert(file_meta_cache_key,
_file_metadata_ptr.release(),
&_meta_cache_handle);
_file_metadata = _meta_cache_handle.data<FileMetaData>();
- _column_statistics.read_bytes += meta_size;
- _statistics.file_footer_read_calls += 1;
+ _reader_statistics.file_footer_read_calls += 1;
} else {
- _statistics.file_footer_hit_cache++;
+ _reader_statistics.file_footer_hit_cache++;
}
_file_metadata = _meta_cache_handle.data<FileMetaData>();
}
@@ -289,9 +289,6 @@ Status ParquetReader::_open_file() {
return Status::InternalError("failed to get file meta data: {}",
_file_description.path);
}
- _column_statistics.read_bytes += meta_size;
- // parse magic number & parse meta data
- _column_statistics.read_calls += 1;
}
return Status::OK();
}
@@ -353,7 +350,7 @@ Status ParquetReader::init_reader(
return Status::InternalError("failed to init parquet reader, please
open reader first");
}
- SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
+ SCOPED_RAW_TIMER(&_reader_statistics.parse_meta_time);
_total_groups = _t_metadata->row_groups.size();
if (_total_groups == 0) {
return Status::EndOfFile("init reader failed, empty parquet file: " +
_scan_range.path);
@@ -651,7 +648,7 @@ Status ParquetReader::get_next_block(Block* block, size_t*
read_rows, bool* eof)
return Status::OK();
}
- SCOPED_RAW_TIMER(&_statistics.column_read_time);
+ SCOPED_RAW_TIMER(&_reader_statistics.column_read_time);
Status batch_st =
_current_group_reader->next_batch(block, _batch_size, read_rows,
&_row_group_eof);
if (batch_st.is<ErrorCode::END_OF_FILE>()) {
@@ -668,11 +665,13 @@ Status ParquetReader::get_next_block(Block* block,
size_t* read_rows, bool* eof)
}
if (_row_group_eof) {
- auto column_st = _current_group_reader->statistics();
+ auto column_st = _current_group_reader->merged_column_statistics();
_column_statistics.merge(column_st);
- _statistics.lazy_read_filtered_rows +=
_current_group_reader->lazy_read_filtered_rows();
- _statistics.predicate_filter_time +=
_current_group_reader->predicate_filter_time();
- _statistics.dict_filter_rewrite_time +=
_current_group_reader->dict_filter_rewrite_time();
+ _reader_statistics.lazy_read_filtered_rows +=
+ _current_group_reader->lazy_read_filtered_rows();
+ _reader_statistics.predicate_filter_time +=
_current_group_reader->predicate_filter_time();
+ _reader_statistics.dict_filter_rewrite_time +=
+ _current_group_reader->dict_filter_rewrite_time();
if (_current_row_group_index.row_group_id + 1 == _total_groups) {
*eof = true;
} else {
@@ -764,23 +763,22 @@ Status ParquetReader::_next_row_group_reader() {
group_size += column_compressed_size(field);
}
- _statistics.read_rows += candidate_row_ranges.count();
+ _reader_statistics.read_rows += candidate_row_ranges.count();
if (_io_ctx) {
_io_ctx->file_reader_stats->read_rows +=
candidate_row_ranges.count();
}
if (candidate_row_ranges.count() != 0) {
// need read this row group.
- _statistics.read_row_groups++;
- _statistics.read_bytes += group_size;
-
- _statistics.filtered_page_rows += row_group.num_rows -
candidate_row_ranges.count();
+ _reader_statistics.read_row_groups++;
+ _reader_statistics.filtered_page_rows +=
+ row_group.num_rows - candidate_row_ranges.count();
break;
} else {
// this row group be filtered.
- _statistics.filtered_row_groups++;
- _statistics.filtered_bytes += group_size;
- _statistics.filtered_group_rows += row_group.num_rows;
+ _reader_statistics.filtered_row_groups++;
+ _reader_statistics.filtered_bytes += group_size;
+ _reader_statistics.filtered_group_rows += row_group.num_rows;
}
}
@@ -947,11 +945,10 @@ Status ParquetReader::_process_page_index_filter(
Slice res(off_index_buff.data(), page_index._offset_index_size);
size_t bytes_read = 0;
{
- SCOPED_RAW_TIMER(&_statistics.read_page_index_time);
+ SCOPED_RAW_TIMER(&_reader_statistics.read_page_index_time);
RETURN_IF_ERROR(_tracing_file_reader->read_at(page_index._offset_index_start,
res,
&bytes_read,
_io_ctx));
}
- _column_statistics.read_bytes += bytes_read;
_column_statistics.page_index_read_calls++;
_col_offsets.clear();
@@ -961,7 +958,7 @@ Status ParquetReader::_process_page_index_filter(
continue;
}
tparquet::OffsetIndex offset_index;
- SCOPED_RAW_TIMER(&_statistics.parse_page_index_time);
+ SCOPED_RAW_TIMER(&_reader_statistics.parse_page_index_time);
RETURN_IF_ERROR(
page_index.parse_offset_index(chunk,
off_index_buff.data(), &offset_index));
_col_offsets[parquet_col_id] = offset_index;
@@ -983,14 +980,13 @@ Status ParquetReader::_process_page_index_filter(
size_t bytes_read = 0;
Slice result(col_index_buff.data(), page_index._column_index_size);
{
- SCOPED_RAW_TIMER(&_statistics.read_page_index_time);
+ SCOPED_RAW_TIMER(&_reader_statistics.read_page_index_time);
RETURN_IF_ERROR(_tracing_file_reader->read_at(page_index._column_index_start,
result,
&bytes_read, _io_ctx));
}
- _column_statistics.read_bytes += bytes_read;
_column_statistics.page_index_read_calls++;
- SCOPED_RAW_TIMER(&_statistics.page_index_filter_time);
+ SCOPED_RAW_TIMER(&_reader_statistics.page_index_filter_time);
// Construct a cacheable page index structure to avoid repeatedly reading
the page index of the same column.
ParquetPredicate::CachedPageIndexStat cached_page_index;
@@ -1033,7 +1029,7 @@ Status ParquetReader::_process_page_index_filter(
tparquet::ColumnIndex column_index;
{
- SCOPED_RAW_TIMER(&_statistics.parse_page_index_time);
+ SCOPED_RAW_TIMER(&_reader_statistics.parse_page_index_time);
RETURN_IF_ERROR(page_index.parse_column_index(column_chunk,
col_index_buff.data(),
&column_index));
}
@@ -1095,7 +1091,7 @@ Status ParquetReader::_process_min_max_bloom_filter(
const RowGroupReader::RowGroupIndex& row_group_index, const
tparquet::RowGroup& row_group,
const std::vector<std::unique_ptr<MutilColumnBlockPredicate>>&
push_down_pred,
RowRanges* row_ranges) {
- SCOPED_RAW_TIMER(&_statistics.row_group_filter_time);
+ SCOPED_RAW_TIMER(&_reader_statistics.row_group_filter_time);
if (!_filter_groups) {
// No row group filtering is needed;
// for example, Iceberg reads position delete files.
@@ -1128,10 +1124,10 @@ Status ParquetReader::_process_min_max_bloom_filter(
// Update statistics based on filter type
if (filter_this_row_group) {
if (filtered_by_min_max) {
- _statistics.filtered_row_groups_by_min_max++;
+ _reader_statistics.filtered_row_groups_by_min_max++;
}
if (filtered_by_bloom_filter) {
- _statistics.filtered_row_groups_by_bloom_filter++;
+ _reader_statistics.filtered_row_groups_by_bloom_filter++;
}
}
@@ -1221,7 +1217,7 @@ Status ParquetReader::_process_column_stat_filter(
}
if (!stat->bloom_filter) {
- SCOPED_RAW_TIMER(&_statistics.bloom_filter_read_time);
+
SCOPED_RAW_TIMER(&_reader_statistics.bloom_filter_read_time);
auto st = ParquetPredicate::read_bloom_filter(
meta_data, _tracing_file_reader, _io_ctx,
stat);
if (!st.ok()) {
@@ -1291,41 +1287,53 @@ void ParquetReader::_collect_profile() {
if (_current_group_reader != nullptr) {
_current_group_reader->collect_profile_before_close();
}
- COUNTER_UPDATE(_parquet_profile.filtered_row_groups,
_statistics.filtered_row_groups);
+ COUNTER_UPDATE(_parquet_profile.filtered_row_groups,
_reader_statistics.filtered_row_groups);
COUNTER_UPDATE(_parquet_profile.filtered_row_groups_by_min_max,
- _statistics.filtered_row_groups_by_min_max);
+ _reader_statistics.filtered_row_groups_by_min_max);
COUNTER_UPDATE(_parquet_profile.filtered_row_groups_by_bloom_filter,
- _statistics.filtered_row_groups_by_bloom_filter);
- COUNTER_UPDATE(_parquet_profile.to_read_row_groups,
_statistics.read_row_groups);
- COUNTER_UPDATE(_parquet_profile.filtered_group_rows,
_statistics.filtered_group_rows);
- COUNTER_UPDATE(_parquet_profile.filtered_page_rows,
_statistics.filtered_page_rows);
- COUNTER_UPDATE(_parquet_profile.lazy_read_filtered_rows,
_statistics.lazy_read_filtered_rows);
- COUNTER_UPDATE(_parquet_profile.filtered_bytes,
_statistics.filtered_bytes);
- COUNTER_UPDATE(_parquet_profile.raw_rows_read, _statistics.read_rows);
- COUNTER_UPDATE(_parquet_profile.to_read_bytes, _statistics.read_bytes);
- COUNTER_UPDATE(_parquet_profile.column_read_time,
_statistics.column_read_time);
- COUNTER_UPDATE(_parquet_profile.parse_meta_time,
_statistics.parse_meta_time);
- COUNTER_UPDATE(_parquet_profile.parse_footer_time,
_statistics.parse_footer_time);
- COUNTER_UPDATE(_parquet_profile.open_file_time,
_statistics.open_file_time);
- COUNTER_UPDATE(_parquet_profile.open_file_num, _statistics.open_file_num);
- COUNTER_UPDATE(_parquet_profile.page_index_filter_time,
_statistics.page_index_filter_time);
- COUNTER_UPDATE(_parquet_profile.read_page_index_time,
_statistics.read_page_index_time);
- COUNTER_UPDATE(_parquet_profile.parse_page_index_time,
_statistics.parse_page_index_time);
- COUNTER_UPDATE(_parquet_profile.row_group_filter_time,
_statistics.row_group_filter_time);
- COUNTER_UPDATE(_parquet_profile.file_footer_read_calls,
_statistics.file_footer_read_calls);
- COUNTER_UPDATE(_parquet_profile.file_footer_hit_cache,
_statistics.file_footer_hit_cache);
+ _reader_statistics.filtered_row_groups_by_bloom_filter);
+ COUNTER_UPDATE(_parquet_profile.to_read_row_groups,
_reader_statistics.read_row_groups);
+ COUNTER_UPDATE(_parquet_profile.total_row_groups, _total_groups);
+ COUNTER_UPDATE(_parquet_profile.filtered_group_rows,
_reader_statistics.filtered_group_rows);
+ COUNTER_UPDATE(_parquet_profile.filtered_page_rows,
_reader_statistics.filtered_page_rows);
+ COUNTER_UPDATE(_parquet_profile.lazy_read_filtered_rows,
+ _reader_statistics.lazy_read_filtered_rows);
+ COUNTER_UPDATE(_parquet_profile.filtered_bytes,
_reader_statistics.filtered_bytes);
+ COUNTER_UPDATE(_parquet_profile.raw_rows_read,
_reader_statistics.read_rows);
+ COUNTER_UPDATE(_parquet_profile.column_read_time,
_reader_statistics.column_read_time);
+ COUNTER_UPDATE(_parquet_profile.parse_meta_time,
_reader_statistics.parse_meta_time);
+ COUNTER_UPDATE(_parquet_profile.parse_footer_time,
_reader_statistics.parse_footer_time);
+ COUNTER_UPDATE(_parquet_profile.file_reader_create_time,
+ _reader_statistics.file_reader_create_time);
+ COUNTER_UPDATE(_parquet_profile.open_file_num,
_reader_statistics.open_file_num);
+ COUNTER_UPDATE(_parquet_profile.page_index_filter_time,
+ _reader_statistics.page_index_filter_time);
+ COUNTER_UPDATE(_parquet_profile.read_page_index_time,
_reader_statistics.read_page_index_time);
+ COUNTER_UPDATE(_parquet_profile.parse_page_index_time,
+ _reader_statistics.parse_page_index_time);
+ COUNTER_UPDATE(_parquet_profile.row_group_filter_time,
+ _reader_statistics.row_group_filter_time);
+ COUNTER_UPDATE(_parquet_profile.file_footer_read_calls,
+ _reader_statistics.file_footer_read_calls);
+ COUNTER_UPDATE(_parquet_profile.file_footer_hit_cache,
+ _reader_statistics.file_footer_hit_cache);
COUNTER_UPDATE(_parquet_profile.skip_page_header_num,
_column_statistics.skip_page_header_num);
COUNTER_UPDATE(_parquet_profile.parse_page_header_num,
_column_statistics.parse_page_header_num);
- COUNTER_UPDATE(_parquet_profile.predicate_filter_time,
_statistics.predicate_filter_time);
- COUNTER_UPDATE(_parquet_profile.dict_filter_rewrite_time,
_statistics.dict_filter_rewrite_time);
- COUNTER_UPDATE(_parquet_profile.bloom_filter_read_time,
_statistics.bloom_filter_read_time);
+ COUNTER_UPDATE(_parquet_profile.predicate_filter_time,
+ _reader_statistics.predicate_filter_time);
+ COUNTER_UPDATE(_parquet_profile.dict_filter_rewrite_time,
+ _reader_statistics.dict_filter_rewrite_time);
+ COUNTER_UPDATE(_parquet_profile.bloom_filter_read_time,
+ _reader_statistics.bloom_filter_read_time);
COUNTER_UPDATE(_parquet_profile.page_index_read_calls,
_column_statistics.page_index_read_calls);
COUNTER_UPDATE(_parquet_profile.decompress_time,
_column_statistics.decompress_time);
COUNTER_UPDATE(_parquet_profile.decompress_cnt,
_column_statistics.decompress_cnt);
COUNTER_UPDATE(_parquet_profile.decode_header_time,
_column_statistics.decode_header_time);
+ COUNTER_UPDATE(_parquet_profile.read_page_header_time,
+ _column_statistics.read_page_header_time);
COUNTER_UPDATE(_parquet_profile.decode_value_time,
_column_statistics.decode_value_time);
COUNTER_UPDATE(_parquet_profile.decode_dict_time,
_column_statistics.decode_dict_time);
COUNTER_UPDATE(_parquet_profile.decode_level_time,
_column_statistics.decode_level_time);
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 31a39b442fc..e02266d944b 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -74,7 +74,7 @@ class ParquetReader : public GenericReader, public
ExprPushDownHelper {
ENABLE_FACTORY_CREATOR(ParquetReader);
public:
- struct Statistics {
+ struct ReaderStatistics {
int32_t filtered_row_groups = 0;
int32_t filtered_row_groups_by_min_max = 0;
int32_t filtered_row_groups_by_bloom_filter = 0;
@@ -84,13 +84,12 @@ public:
int64_t lazy_read_filtered_rows = 0;
int64_t read_rows = 0;
int64_t filtered_bytes = 0;
- int64_t read_bytes = 0;
int64_t column_read_time = 0;
int64_t parse_meta_time = 0;
int64_t parse_footer_time = 0;
int64_t file_footer_read_calls = 0;
int64_t file_footer_hit_cache = 0;
- int64_t open_file_time = 0;
+ int64_t file_reader_create_time = 0;
int64_t open_file_num = 0;
int64_t row_group_filter_time = 0;
int64_t page_index_filter_time = 0;
@@ -146,7 +145,7 @@ public:
Status get_parsed_schema(std::vector<std::string>* col_names,
std::vector<DataTypePtr>* col_types) override;
- Statistics& statistics() { return _statistics; }
+ ReaderStatistics& reader_statistics() { return _reader_statistics; }
const tparquet::FileMetaData* get_meta_data() const { return _t_metadata; }
@@ -178,16 +177,16 @@ private:
RuntimeProfile::Counter* filtered_row_groups_by_min_max = nullptr;
RuntimeProfile::Counter* filtered_row_groups_by_bloom_filter = nullptr;
RuntimeProfile::Counter* to_read_row_groups = nullptr;
+ RuntimeProfile::Counter* total_row_groups = nullptr;
RuntimeProfile::Counter* filtered_group_rows = nullptr;
RuntimeProfile::Counter* filtered_page_rows = nullptr;
RuntimeProfile::Counter* lazy_read_filtered_rows = nullptr;
RuntimeProfile::Counter* filtered_bytes = nullptr;
RuntimeProfile::Counter* raw_rows_read = nullptr;
- RuntimeProfile::Counter* to_read_bytes = nullptr;
RuntimeProfile::Counter* column_read_time = nullptr;
RuntimeProfile::Counter* parse_meta_time = nullptr;
RuntimeProfile::Counter* parse_footer_time = nullptr;
- RuntimeProfile::Counter* open_file_time = nullptr;
+ RuntimeProfile::Counter* file_reader_create_time = nullptr;
RuntimeProfile::Counter* open_file_num = nullptr;
RuntimeProfile::Counter* row_group_filter_time = nullptr;
RuntimeProfile::Counter* page_index_read_calls = nullptr;
@@ -199,6 +198,7 @@ private:
RuntimeProfile::Counter* decompress_time = nullptr;
RuntimeProfile::Counter* decompress_cnt = nullptr;
RuntimeProfile::Counter* decode_header_time = nullptr;
+ RuntimeProfile::Counter* read_page_header_time = nullptr;
RuntimeProfile::Counter* decode_value_time = nullptr;
RuntimeProfile::Counter* decode_dict_time = nullptr;
RuntimeProfile::Counter* decode_level_time = nullptr;
@@ -322,8 +322,8 @@ private:
// _table_column_names = _missing_cols + _read_table_columns
const std::vector<std::string>* _table_column_names = nullptr;
- Statistics _statistics;
- ParquetColumnReader::Statistics _column_statistics;
+ ReaderStatistics _reader_statistics;
+ ParquetColumnReader::ColumnStatistics _column_statistics;
ParquetProfile _parquet_profile;
bool _closed = false;
io::IOContext* _io_ctx = nullptr;
diff --git a/be/src/vec/exec/scan/file_scanner.cpp
b/be/src/vec/exec/scan/file_scanner.cpp
index 8a0f296b2b9..6eea3334c47 100644
--- a/be/src/vec/exec/scan/file_scanner.cpp
+++ b/be/src/vec/exec/scan/file_scanner.cpp
@@ -1792,6 +1792,10 @@ void FileScanner::update_realtime_counters() {
_state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes(
_file_reader_stats->read_bytes);
+ int64_t delta_bytes_read_from_local =
+ _file_cache_statistics->bytes_read_from_local -
_last_bytes_read_from_local;
+ int64_t delta_bytes_read_from_remote =
+ _file_cache_statistics->bytes_read_from_remote -
_last_bytes_read_from_remote;
if (_file_cache_statistics->bytes_read_from_local == 0 &&
_file_cache_statistics->bytes_read_from_remote == 0) {
_state->get_query_ctx()
@@ -1802,16 +1806,15 @@ void FileScanner::update_realtime_counters() {
_file_reader_stats->read_bytes);
} else {
_state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage(
- _file_cache_statistics->bytes_read_from_local);
+ delta_bytes_read_from_local);
_state->get_query_ctx()
->resource_ctx()
->io_context()
- ->update_scan_bytes_from_remote_storage(
- _file_cache_statistics->bytes_read_from_remote);
+
->update_scan_bytes_from_remote_storage(delta_bytes_read_from_remote);
DorisMetrics::instance()->query_scan_bytes_from_local->increment(
- _file_cache_statistics->bytes_read_from_local);
+ delta_bytes_read_from_local);
DorisMetrics::instance()->query_scan_bytes_from_remote->increment(
- _file_cache_statistics->bytes_read_from_remote);
+ delta_bytes_read_from_remote);
}
COUNTER_UPDATE(_file_read_bytes_counter, _file_reader_stats->read_bytes);
@@ -1821,8 +1824,9 @@ void FileScanner::update_realtime_counters() {
_file_reader_stats->read_bytes = 0;
_file_reader_stats->read_rows = 0;
- _file_cache_statistics->bytes_read_from_local = 0;
- _file_cache_statistics->bytes_read_from_remote = 0;
+
+ _last_bytes_read_from_local =
_file_cache_statistics->bytes_read_from_local;
+ _last_bytes_read_from_remote =
_file_cache_statistics->bytes_read_from_remote;
}
void FileScanner::_collect_profile_before_close() {
diff --git a/be/src/vec/exec/scan/file_scanner.h
b/be/src/vec/exec/scan/file_scanner.h
index 379f6a246f6..835da10eb27 100644
--- a/be/src/vec/exec/scan/file_scanner.h
+++ b/be/src/vec/exec/scan/file_scanner.h
@@ -234,6 +234,8 @@ private:
std::pair<std::shared_ptr<RowIdColumnIteratorV2>, int>
_row_id_column_iterator_pair = {nullptr,
-1};
+ int64_t _last_bytes_read_from_local = 0;
+ int64_t _last_bytes_read_from_remote = 0;
private:
Status _init_expr_ctxes();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]