This is an automated email from the ASF dual-hosted git repository. kakachen pushed a commit to branch tpc_preview4-external in repository https://gitbox.apache.org/repos/asf/doris.git
commit 22931c2e9220c84f1ba42a500ad66fd56582bf15 Author: Mingyu Chen (Rayner) <[email protected]> AuthorDate: Wed Dec 17 18:05:56 2025 +0800 [fix](profile) sort out parquet reader profile (#58895) Problem Summary: Refine some metrics in parquet reader profile. 1. Rename some `Statistics` class name to make it readable. (There are too many `Statistics` struct with same name) 2. Add `read page header timer` in parquet reader profile 3. fix issue of invalid check logic for `MergeRangeFileReader` when setting prefetch buffer size 4. fix issue that data cache profile is incorrect for external table can. --- be/src/io/fs/buffered_reader.cpp | 3 - be/src/io/fs/buffered_reader.h | 18 --- .../parquet/vparquet_column_chunk_reader.cpp | 14 +-- .../format/parquet/vparquet_column_chunk_reader.h | 18 +-- .../exec/format/parquet/vparquet_column_reader.h | 72 +++++------- .../exec/format/parquet/vparquet_group_reader.cpp | 6 +- .../exec/format/parquet/vparquet_group_reader.h | 2 +- .../exec/format/parquet/vparquet_page_reader.cpp | 6 +- .../vec/exec/format/parquet/vparquet_page_reader.h | 8 +- be/src/vec/exec/format/parquet/vparquet_reader.cpp | 125 +++++++++++---------- be/src/vec/exec/format/parquet/vparquet_reader.h | 15 ++- be/src/vec/exec/scan/file_scanner.cpp | 1 - 12 files changed, 129 insertions(+), 159 deletions(-) diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index cae08d28417..62774160555 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -819,12 +819,10 @@ Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t offset int64_t buf_remaining = _buf_end_offset - _buf_start_offset; int64_t to_read = std::min(_buf_size - buf_remaining, _file_end_offset - _buf_end_offset); int64_t has_read = 0; - SCOPED_RAW_TIMER(&_statistics.read_time); while (has_read < to_read) { size_t loop_read = 0; Slice result(_buf.get() + buf_remaining + has_read, to_read - has_read); RETURN_IF_ERROR(_file->read_at(_buf_end_offset + has_read, result, &loop_read, io_ctx)); - _statistics.read_calls++; if (loop_read == 0) { break; } @@ -833,7 +831,6 @@ Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t offset if (has_read != to_read) { return Status::Corruption("Try to read {} bytes, but received {} bytes", to_read, has_read); } - _statistics.read_bytes += to_read; _buf_end_offset += to_read; *buf = _buf.get(); return Status::OK(); diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index 6bcf634aef3..5fe07176235 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -225,7 +225,6 @@ public: int64_t merged_io = 0; int64_t request_bytes = 0; int64_t merged_bytes = 0; - int64_t apply_bytes = 0; }; struct RangeCachedData { @@ -299,9 +298,6 @@ public: _merged_read_slice_size = READ_SLICE_SIZE; } - for (const PrefetchRange& range : _random_access_ranges) { - _statistics.apply_bytes += range.end_offset - range.start_offset; - } if (_profile != nullptr) { const char* random_profile = "MergedSmallIO"; ADD_TIMER_WITH_LEVEL(_profile, random_profile, 1); @@ -315,8 +311,6 @@ public: random_profile, 1); _merged_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "MergedBytes", TUnit::BYTES, random_profile, 1); - _apply_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "ApplyBytes", TUnit::BYTES, - random_profile, 1); } } @@ -359,7 +353,6 @@ protected: COUNTER_UPDATE(_merged_io, _statistics.merged_io); COUNTER_UPDATE(_request_bytes, _statistics.request_bytes); COUNTER_UPDATE(_merged_bytes, _statistics.merged_bytes); - COUNTER_UPDATE(_apply_bytes, _statistics.apply_bytes); if (_reader != nullptr) { _reader->collect_profile_before_close(); } @@ -373,7 +366,6 @@ private: RuntimeProfile::Counter* _merged_io = nullptr; RuntimeProfile::Counter* _request_bytes = nullptr; RuntimeProfile::Counter* _merged_bytes = nullptr; - RuntimeProfile::Counter* _apply_bytes = nullptr; int _search_read_range(size_t start_offset, size_t end_offset); void _clean_cached_data(RangeCachedData& cached_data); @@ -619,12 +611,6 @@ private: */ class BufferedStreamReader { public: - struct Statistics { - int64_t read_time = 0; - int64_t read_calls = 0; - int64_t read_bytes = 0; - }; - /** * Return the address of underlying buffer that locates the start of data between [offset, offset + bytes_to_read) * @param buf the buffer address to save the start address of data @@ -637,13 +623,9 @@ public: * Save the data address to slice.data, and the slice.size is the bytes to read. */ virtual Status read_bytes(Slice& slice, uint64_t offset, const IOContext* io_ctx) = 0; - Statistics& statistics() { return _statistics; } virtual ~BufferedStreamReader() = default; // return the file path virtual std::string path() = 0; - -protected: - Statistics _statistics; }; class BufferedFileStreamReader : public BufferedStreamReader, public ProfileCollector { diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp index 61a25cafa26..a474b76a518 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp @@ -188,8 +188,8 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::load_page_data() { // check decompressed buffer size _reserve_decompress_buf(uncompressed_size); _page_data = Slice(_decompress_buf.get(), uncompressed_size); - SCOPED_RAW_TIMER(&_statistics.decompress_time); - _statistics.decompress_cnt++; + SCOPED_RAW_TIMER(&_chunk_statistics.decompress_time); + _chunk_statistics.decompress_cnt++; RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, &_page_data)); } else { // Don't need decompress @@ -204,7 +204,7 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::load_page_data() { // Initialize repetition level and definition level. Skip when level = 0, which means required field. if (_max_rep_level > 0) { - SCOPED_RAW_TIMER(&_statistics.decode_level_time); + SCOPED_RAW_TIMER(&_chunk_statistics.decode_level_time); if (header->__isset.data_page_header_v2) { RETURN_IF_ERROR(_rep_level_decoder.init_v2(_v2_rep_levels, _max_rep_level, _remaining_rep_nums)); @@ -215,7 +215,7 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::load_page_data() { } } if (_max_def_level > 0) { - SCOPED_RAW_TIMER(&_statistics.decode_level_time); + SCOPED_RAW_TIMER(&_chunk_statistics.decode_level_time); if (header->__isset.data_page_header_v2) { RETURN_IF_ERROR(_def_level_decoder.init_v2(_v2_def_levels, _max_def_level, _remaining_def_nums)); @@ -255,7 +255,7 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::_decode_dict_page() { const tparquet::PageHeader* header = nullptr; RETURN_IF_ERROR(_page_reader->get_page_header(header)); DCHECK_EQ(tparquet::PageType::DICTIONARY_PAGE, header->type); - SCOPED_RAW_TIMER(&_statistics.decode_dict_time); + SCOPED_RAW_TIMER(&_chunk_statistics.decode_dict_time); // Using the PLAIN_DICTIONARY enum value is deprecated in the Parquet 2.0 specification. // Prefer using RLE_DICTIONARY in a data page and PLAIN in a dictionary page for Parquet 2.0+ files. @@ -314,7 +314,7 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::skip_values(size_t num_va } _remaining_num_values -= num_values; if (skip_data) { - SCOPED_RAW_TIMER(&_statistics.decode_value_time); + SCOPED_RAW_TIMER(&_chunk_statistics.decode_value_time); return _page_decoder->skip_values(num_values); } else { return Status::OK(); @@ -328,7 +328,7 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::decode_values( if (select_vector.num_values() == 0) { return Status::OK(); } - SCOPED_RAW_TIMER(&_statistics.decode_value_time); + SCOPED_RAW_TIMER(&_chunk_statistics.decode_value_time); if (UNLIKELY((doris_column->is_column_dictionary() || is_dict_filter) && !_has_dict)) { return Status::IOError("Not dictionary coded"); } diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h index 992bc823430..93d0f92ecec 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h @@ -60,6 +60,7 @@ struct ColumnChunkReaderStatistics { int64_t decode_level_time = 0; int64_t skip_page_header_num = 0; int64_t parse_page_header_num = 0; + int64_t read_page_header_time = 0; }; /** @@ -158,12 +159,15 @@ public: // Get page decoder Decoder* get_page_decoder() { return _page_decoder; } - ColumnChunkReaderStatistics& statistics() { - _statistics.decode_header_time = _page_reader->statistics().decode_header_time; - _statistics.skip_page_header_num = _page_reader->statistics().skip_page_header_num; - _statistics.parse_page_header_num = _page_reader->statistics().parse_page_header_num; - _statistics.read_page_header_time = _page_reader->statistics().read_page_header_time; - return _statistics; + ColumnChunkReaderStatistics& chunk_statistics() { + _chunk_statistics.decode_header_time = _page_reader->page_statistics().decode_header_time; + _chunk_statistics.skip_page_header_num = + _page_reader->page_statistics().skip_page_header_num; + _chunk_statistics.parse_page_header_num = + _page_reader->page_statistics().parse_page_header_num; + _chunk_statistics.read_page_header_time = + _page_reader->page_statistics().read_page_header_time; + return _chunk_statistics; } Status read_dict_values_to_column(MutableColumnPtr& doris_column) { @@ -251,7 +255,7 @@ private: // Map: encoding -> Decoder // Plain or Dictionary encoding. If the dictionary grows too big, the encoding will fall back to the plain encoding std::unordered_map<int, std::unique_ptr<Decoder>> _decoders; - ColumnChunkReaderStatistics _statistics; + ColumnChunkReaderStatistics _chunk_statistics; }; #include "common/compile_check_end.h" diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h index 98df81fb540..4a49473a69f 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h @@ -53,12 +53,9 @@ using ColumnString = ColumnStr<UInt32>; class ParquetColumnReader { public: - struct Statistics { - Statistics() - : read_time(0), - read_calls(0), - page_index_read_calls(0), - read_bytes(0), + struct ColumnStatistics { + ColumnStatistics() + : page_index_read_calls(0), decompress_time(0), decompress_cnt(0), decode_header_time(0), @@ -70,12 +67,8 @@ public: parse_page_header_num(0), read_page_header_time(0) {} - Statistics(io::BufferedStreamReader::Statistics& fs, ColumnChunkReaderStatistics& cs, - int64_t null_map_time) - : read_time(fs.read_time), - read_calls(fs.read_calls), - page_index_read_calls(0), - read_bytes(fs.read_bytes), + ColumnStatistics(ColumnChunkReaderStatistics& cs, int64_t null_map_time) + : page_index_read_calls(0), decompress_time(cs.decompress_time), decompress_cnt(cs.decompress_cnt), decode_header_time(cs.decode_header_time), @@ -87,10 +80,7 @@ public: parse_page_header_num(cs.parse_page_header_num), read_page_header_time(cs.read_page_header_time) {} - int64_t read_time; - int64_t read_calls; int64_t page_index_read_calls; - int64_t read_bytes; int64_t decompress_time; int64_t decompress_cnt; int64_t decode_header_time; @@ -102,21 +92,18 @@ public: int64_t parse_page_header_num; int64_t read_page_header_time; - void merge(Statistics& statistics) { - read_time += statistics.read_time; - read_calls += statistics.read_calls; - read_bytes += statistics.read_bytes; - page_index_read_calls += statistics.page_index_read_calls; - decompress_time += statistics.decompress_time; - decompress_cnt += statistics.decompress_cnt; - decode_header_time += statistics.decode_header_time; - decode_value_time += statistics.decode_value_time; - decode_dict_time += statistics.decode_dict_time; - decode_level_time += statistics.decode_level_time; - decode_null_map_time += statistics.decode_null_map_time; - skip_page_header_num += statistics.skip_page_header_num; - parse_page_header_num += statistics.parse_page_header_num; - read_page_header_time += statistics.read_page_header_time; + void merge(ColumnStatistics& col_statistics) { + page_index_read_calls += col_statistics.page_index_read_calls; + decompress_time += col_statistics.decompress_time; + decompress_cnt += col_statistics.decompress_cnt; + decode_header_time += col_statistics.decode_header_time; + decode_value_time += col_statistics.decode_value_time; + decode_dict_time += col_statistics.decode_dict_time; + decode_level_time += col_statistics.decode_level_time; + decode_null_map_time += col_statistics.decode_null_map_time; + skip_page_header_num += col_statistics.skip_page_header_num; + parse_page_header_num += col_statistics.parse_page_header_num; + read_page_header_time += col_statistics.read_page_header_time; } }; @@ -148,7 +135,7 @@ public: const std::set<uint64_t>& filter_column_ids = {}); virtual const std::vector<level_t>& get_rep_level() const = 0; virtual const std::vector<level_t>& get_def_level() const = 0; - virtual Statistics statistics() = 0; + virtual ColumnStatistics column_statistics() = 0; virtual void close() = 0; virtual void reset_filter_map_index() = 0; @@ -191,9 +178,8 @@ public: MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* dict_column) override; const std::vector<level_t>& get_rep_level() const override { return _rep_levels; } const std::vector<level_t>& get_def_level() const override { return _def_levels; } - Statistics statistics() override { - return Statistics(_stream_reader->statistics(), _chunk_reader->statistics(), - _decode_null_map_time); + ColumnStatistics column_statistics() override { + return ColumnStatistics(_chunk_reader->chunk_statistics(), _decode_null_map_time); } void close() override {} @@ -307,7 +293,7 @@ public: const std::vector<level_t>& get_def_level() const override { return _element_reader->get_def_level(); } - Statistics statistics() override { return _element_reader->statistics(); } + ColumnStatistics column_statistics() override { return _element_reader->column_statistics(); } void close() override {} void reset_filter_map_index() override { _element_reader->reset_filter_map_index(); } @@ -338,9 +324,9 @@ public: return _key_reader->get_def_level(); } - Statistics statistics() override { - Statistics kst = _key_reader->statistics(); - Statistics vst = _value_reader->statistics(); + ColumnStatistics column_statistics() override { + ColumnStatistics kst = _key_reader->column_statistics(); + ColumnStatistics vst = _value_reader->column_statistics(); kst.merge(vst); return kst; } @@ -395,12 +381,12 @@ public: return _child_readers.begin()->second->get_def_level(); } - Statistics statistics() override { - Statistics st; + ColumnStatistics column_statistics() override { + ColumnStatistics st; for (const auto& column_name : _read_column_names) { auto reader = _child_readers.find(column_name); if (reader != _child_readers.end()) { - Statistics cst = reader->second->statistics(); + ColumnStatistics cst = reader->second->column_statistics(); st.merge(cst); } } @@ -493,8 +479,8 @@ public: } // Implement required pure virtual methods from base class - Statistics statistics() override { - return Statistics(); // Return empty statistics + ColumnStatistics column_statistics() override { + return ColumnStatistics(); // Return empty statistics } void close() override { diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index ad3a322e475..31161f2c6ed 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -1128,10 +1128,10 @@ void RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) { } } -ParquetColumnReader::Statistics RowGroupReader::statistics() { - ParquetColumnReader::Statistics st; +ParquetColumnReader::ColumnStatistics RowGroupReader::merged_column_statistics() { + ParquetColumnReader::ColumnStatistics st; for (auto& reader : _column_readers) { - auto ost = reader.second->statistics(); + auto ost = reader.second->column_statistics(); st.merge(ost); } return st; diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index f81d6607349..e9c4470af17 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -173,7 +173,7 @@ public: int64_t predicate_filter_time() const { return _predicate_filter_time; } int64_t dict_filter_rewrite_time() const { return _dict_filter_rewrite_time; } - ParquetColumnReader::Statistics statistics(); + ParquetColumnReader::ColumnStatistics merged_column_statistics(); void set_remaining_rows(int64_t rows) { _remaining_rows = rows; } int64_t get_remaining_rows() { return _remaining_rows; } diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp index 19d4edf6d26..3b6d7fdcb9b 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp @@ -88,11 +88,11 @@ Status PageReader<IN_COLLECTION, OFFSET_INDEX>::parse_page_header() { } header_size = std::min(header_size, max_size); { - SCOPED_RAW_TIMER(&_statistics.read_page_header_time); + SCOPED_RAW_TIMER(&_page_statistics.read_page_header_time); RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, header_size, _io_ctx)); } real_header_size = cast_set<uint32_t>(header_size); - SCOPED_RAW_TIMER(&_statistics.decode_header_time); + SCOPED_RAW_TIMER(&_page_statistics.decode_header_time); auto st = deserialize_thrift_msg(page_header_buf, &real_header_size, true, &_cur_page_header); if (st.ok()) { @@ -115,7 +115,7 @@ Status PageReader<IN_COLLECTION, OFFSET_INDEX>::parse_page_header() { } } - _statistics.parse_page_header_num++; + _page_statistics.parse_page_header_num++; _offset += real_header_size; _next_header_offset = _offset + _cur_page_header.compressed_page_size; _state = HEADER_PARSED; diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.h b/be/src/vec/exec/format/parquet/vparquet_page_reader.h index 16dc49f030d..9246819d59c 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.h @@ -53,7 +53,7 @@ namespace doris::vectorized { template <bool IN_COLLECTION, bool OFFSET_INDEX> class PageReader { public: - struct Statistics { + struct PageStatistics { int64_t decode_header_time = 0; int64_t skip_page_header_num = 0; int64_t parse_page_header_num = 0; @@ -83,7 +83,7 @@ public: Status parse_page_header(); Status next_page() { - _statistics.skip_page_header_num += _state == INITIALIZED; + _page_statistics.skip_page_header_num += _state == INITIALIZED; if constexpr (OFFSET_INDEX) { _page_index++; _start_row = _offset_index->page_locations[_page_index].first_row_index; @@ -133,7 +133,7 @@ public: Status get_page_data(Slice& slice); - Statistics& statistics() { return _statistics; } + PageStatistics& page_statistics() { return _page_statistics; } bool is_header_v2() { return _cur_page_header.__isset.data_page_header_v2; } @@ -144,7 +144,7 @@ public: private: enum PageReaderState { INITIALIZED, HEADER_PARSED, DATA_LOADED }; PageReaderState _state = INITIALIZED; - Statistics _statistics; + PageStatistics _page_statistics; io::BufferedStreamReader* _reader = nullptr; io::IOContext* _io_ctx = nullptr; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 77d8212ab01..76922608c2a 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -140,9 +140,11 @@ void ParquetReader::_init_profile() { ADD_TIMER_WITH_LEVEL(_profile, parquet_profile, 1); _parquet_profile.filtered_row_groups = ADD_CHILD_COUNTER_WITH_LEVEL( - _profile, "FilteredGroups", TUnit::UNIT, parquet_profile, 1); + _profile, "RowGroupsFiltered", TUnit::UNIT, parquet_profile, 1); _parquet_profile.to_read_row_groups = ADD_CHILD_COUNTER_WITH_LEVEL( - _profile, "ReadGroups", TUnit::UNIT, parquet_profile, 1); + _profile, "RowGroupsReadNum", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.total_row_groups = ADD_CHILD_COUNTER_WITH_LEVEL( + _profile, "RowGroupsTotalNum", TUnit::UNIT, parquet_profile, 1); _parquet_profile.filtered_group_rows = ADD_CHILD_COUNTER_WITH_LEVEL( _profile, "FilteredRowsByGroup", TUnit::UNIT, parquet_profile, 1); _parquet_profile.filtered_page_rows = ADD_CHILD_COUNTER_WITH_LEVEL( @@ -153,16 +155,14 @@ void ParquetReader::_init_profile() { _profile, "FilteredBytes", TUnit::BYTES, parquet_profile, 1); _parquet_profile.raw_rows_read = ADD_CHILD_COUNTER_WITH_LEVEL( _profile, "RawRowsRead", TUnit::UNIT, parquet_profile, 1); - _parquet_profile.to_read_bytes = ADD_CHILD_COUNTER_WITH_LEVEL( - _profile, "ReadBytes", TUnit::BYTES, parquet_profile, 1); _parquet_profile.column_read_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ColumnReadTime", parquet_profile, 1); _parquet_profile.parse_meta_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ParseMetaTime", parquet_profile, 1); _parquet_profile.parse_footer_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ParseFooterTime", parquet_profile, 1); - _parquet_profile.open_file_time = - ADD_CHILD_TIMER_WITH_LEVEL(_profile, "FileOpenTime", parquet_profile, 1); + _parquet_profile.file_reader_create_time = + ADD_CHILD_TIMER_WITH_LEVEL(_profile, "FileReaderCreateTime", parquet_profile, 1); _parquet_profile.open_file_num = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "FileNum", TUnit::UNIT, parquet_profile, 1); _parquet_profile.page_index_read_calls = @@ -184,7 +184,7 @@ void ParquetReader::_init_profile() { _parquet_profile.decompress_cnt = ADD_CHILD_COUNTER_WITH_LEVEL( _profile, "DecompressCount", TUnit::UNIT, parquet_profile, 1); _parquet_profile.decode_header_time = - ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecodeHeaderTime", parquet_profile, 1); + ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PageHeaderDecodeTime", parquet_profile, 1); _parquet_profile.read_page_header_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PageHeaderReadTime", parquet_profile, 1); _parquet_profile.decode_value_time = @@ -222,8 +222,8 @@ Status ParquetReader::_open_file() { return Status::EndOfFile("stop"); } if (_file_reader == nullptr) { - SCOPED_RAW_TIMER(&_statistics.open_file_time); - ++_statistics.open_file_num; + SCOPED_RAW_TIMER(&_reader_statistics.file_reader_create_time); + ++_reader_statistics.open_file_num; _file_description.mtime = _scan_range.__isset.modification_time ? _scan_range.modification_time : 0; io::FileReaderOptions reader_options = @@ -237,7 +237,7 @@ Status ParquetReader::_open_file() { } if (_file_metadata == nullptr) { - SCOPED_RAW_TIMER(&_statistics.parse_footer_time); + SCOPED_RAW_TIMER(&_reader_statistics.parse_footer_time); if (_tracing_file_reader->size() <= sizeof(PARQUET_VERSION_NUMBER)) { // Some system may generate parquet file with only 4 bytes: PAR1 // Should consider it as empty file. @@ -251,9 +251,8 @@ Status ParquetReader::_open_file() { &meta_size, _io_ctx)); _file_metadata = _file_metadata_ptr.get(); - _column_statistics.read_bytes += meta_size; // parse magic number & parse meta data - _statistics.file_footer_read_calls += 1; + _reader_statistics.file_footer_read_calls += 1; } else { const auto& file_meta_cache_key = FileMetaCache::get_key(_tracing_file_reader, _file_description); @@ -264,10 +263,9 @@ Status ParquetReader::_open_file() { _meta_cache->insert(file_meta_cache_key, _file_metadata_ptr.release(), &_meta_cache_handle); _file_metadata = _meta_cache_handle.data<FileMetaData>(); - _column_statistics.read_bytes += meta_size; - _statistics.file_footer_read_calls += 1; + _reader_statistics.file_footer_read_calls += 1; } else { - _statistics.file_footer_hit_cache++; + _reader_statistics.file_footer_hit_cache++; } _file_metadata = _meta_cache_handle.data<FileMetaData>(); } @@ -276,9 +274,6 @@ Status ParquetReader::_open_file() { return Status::InternalError("failed to get file meta data: {}", _file_description.path); } - _column_statistics.read_bytes += meta_size; - // parse magic number & parse meta data - _column_statistics.read_calls += 1; } return Status::OK(); } @@ -337,7 +332,7 @@ Status ParquetReader::init_reader( return Status::InternalError("failed to init parquet reader, please open reader first"); } - SCOPED_RAW_TIMER(&_statistics.parse_meta_time); + SCOPED_RAW_TIMER(&_reader_statistics.parse_meta_time); _total_groups = _t_metadata->row_groups.size(); if (_total_groups == 0) { return Status::EndOfFile("init reader failed, empty parquet file: " + _scan_range.path); @@ -629,7 +624,7 @@ Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof) return Status::OK(); } - SCOPED_RAW_TIMER(&_statistics.column_read_time); + SCOPED_RAW_TIMER(&_reader_statistics.column_read_time); Status batch_st = _current_group_reader->next_batch(block, _batch_size, read_rows, &_row_group_eof); if (batch_st.is<ErrorCode::END_OF_FILE>()) { @@ -646,11 +641,13 @@ Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof) } if (_row_group_eof) { - auto column_st = _current_group_reader->statistics(); + auto column_st = _current_group_reader->merged_column_statistics(); _column_statistics.merge(column_st); - _statistics.lazy_read_filtered_rows += _current_group_reader->lazy_read_filtered_rows(); - _statistics.predicate_filter_time += _current_group_reader->predicate_filter_time(); - _statistics.dict_filter_rewrite_time += _current_group_reader->dict_filter_rewrite_time(); + _reader_statistics.lazy_read_filtered_rows += + _current_group_reader->lazy_read_filtered_rows(); + _reader_statistics.predicate_filter_time += _current_group_reader->predicate_filter_time(); + _reader_statistics.dict_filter_rewrite_time += + _current_group_reader->dict_filter_rewrite_time(); if (_current_row_group_index.row_group_id + 1 == _total_groups) { *eof = true; } else { @@ -742,23 +739,22 @@ Status ParquetReader::_next_row_group_reader() { group_size += column_compressed_size(field); } - _statistics.read_rows += candidate_row_ranges.count(); + _reader_statistics.read_rows += candidate_row_ranges.count(); if (_io_ctx) { _io_ctx->file_reader_stats->read_rows += candidate_row_ranges.count(); } if (candidate_row_ranges.count() != 0) { // need read this row group. - _statistics.read_row_groups++; - _statistics.read_bytes += group_size; - - _statistics.filtered_page_rows += row_group.num_rows - candidate_row_ranges.count(); + _reader_statistics.read_row_groups++; + _reader_statistics.filtered_page_rows += + row_group.num_rows - candidate_row_ranges.count(); break; } else { // this row group be filtered. - _statistics.filtered_row_groups++; - _statistics.filtered_bytes += group_size; - _statistics.filtered_group_rows += row_group.num_rows; + _reader_statistics.filtered_row_groups++; + _reader_statistics.filtered_bytes += group_size; + _reader_statistics.filtered_group_rows += row_group.num_rows; } } @@ -929,11 +925,10 @@ Status ParquetReader::_process_page_index_filter( Slice res(off_index_buff.data(), page_index._offset_index_size); size_t bytes_read = 0; { - SCOPED_RAW_TIMER(&_statistics.read_page_index_time); + SCOPED_RAW_TIMER(&_reader_statistics.read_page_index_time); RETURN_IF_ERROR(_tracing_file_reader->read_at(page_index._offset_index_start, res, &bytes_read, _io_ctx)); } - _column_statistics.read_bytes += bytes_read; _column_statistics.page_index_read_calls++; _col_offsets.clear(); @@ -943,7 +938,7 @@ Status ParquetReader::_process_page_index_filter( continue; } tparquet::OffsetIndex offset_index; - SCOPED_RAW_TIMER(&_statistics.parse_page_index_time); + SCOPED_RAW_TIMER(&_reader_statistics.parse_page_index_time); RETURN_IF_ERROR( page_index.parse_offset_index(chunk, off_index_buff.data(), &offset_index)); _col_offsets[parquet_col_id] = offset_index; @@ -965,14 +960,13 @@ Status ParquetReader::_process_page_index_filter( size_t bytes_read = 0; Slice result(col_index_buff.data(), page_index._column_index_size); { - SCOPED_RAW_TIMER(&_statistics.read_page_index_time); + SCOPED_RAW_TIMER(&_reader_statistics.read_page_index_time); RETURN_IF_ERROR(_tracing_file_reader->read_at(page_index._column_index_start, result, &bytes_read, _io_ctx)); } - _column_statistics.read_bytes += bytes_read; _column_statistics.page_index_read_calls++; - SCOPED_RAW_TIMER(&_statistics.page_index_filter_time); + SCOPED_RAW_TIMER(&_reader_statistics.page_index_filter_time); // Construct a cacheable page index structure to avoid repeatedly reading the page index of the same column. ParquetPredicate::CachedPageIndexStat cached_page_index; @@ -1009,7 +1003,7 @@ Status ParquetReader::_process_page_index_filter( tparquet::ColumnIndex column_index; { - SCOPED_RAW_TIMER(&_statistics.parse_page_index_time); + SCOPED_RAW_TIMER(&_reader_statistics.parse_page_index_time); RETURN_IF_ERROR(page_index.parse_column_index(column_chunk, col_index_buff.data(), &column_index)); } @@ -1072,7 +1066,7 @@ Status ParquetReader::_process_min_max_bloom_filter( const RowGroupReader::RowGroupIndex& row_group_index, const tparquet::RowGroup& row_group, const std::vector<std::unique_ptr<MutilColumnBlockPredicate>>& push_down_pred, RowRanges* row_ranges) { - SCOPED_RAW_TIMER(&_statistics.row_group_filter_time); + SCOPED_RAW_TIMER(&_reader_statistics.row_group_filter_time); if (!_filter_groups) { // No row group filtering is needed; // for example, Iceberg reads position delete files. @@ -1177,31 +1171,40 @@ void ParquetReader::_collect_profile() { if (_current_group_reader != nullptr) { _current_group_reader->collect_profile_before_close(); } - COUNTER_UPDATE(_parquet_profile.filtered_row_groups, _statistics.filtered_row_groups); - COUNTER_UPDATE(_parquet_profile.to_read_row_groups, _statistics.read_row_groups); - COUNTER_UPDATE(_parquet_profile.filtered_group_rows, _statistics.filtered_group_rows); - COUNTER_UPDATE(_parquet_profile.filtered_page_rows, _statistics.filtered_page_rows); - COUNTER_UPDATE(_parquet_profile.lazy_read_filtered_rows, _statistics.lazy_read_filtered_rows); - COUNTER_UPDATE(_parquet_profile.filtered_bytes, _statistics.filtered_bytes); - COUNTER_UPDATE(_parquet_profile.raw_rows_read, _statistics.read_rows); - COUNTER_UPDATE(_parquet_profile.to_read_bytes, _statistics.read_bytes); - COUNTER_UPDATE(_parquet_profile.column_read_time, _statistics.column_read_time); - COUNTER_UPDATE(_parquet_profile.parse_meta_time, _statistics.parse_meta_time); - COUNTER_UPDATE(_parquet_profile.parse_footer_time, _statistics.parse_footer_time); - COUNTER_UPDATE(_parquet_profile.open_file_time, _statistics.open_file_time); - COUNTER_UPDATE(_parquet_profile.open_file_num, _statistics.open_file_num); - COUNTER_UPDATE(_parquet_profile.page_index_filter_time, _statistics.page_index_filter_time); - COUNTER_UPDATE(_parquet_profile.read_page_index_time, _statistics.read_page_index_time); - COUNTER_UPDATE(_parquet_profile.parse_page_index_time, _statistics.parse_page_index_time); - COUNTER_UPDATE(_parquet_profile.row_group_filter_time, _statistics.row_group_filter_time); - COUNTER_UPDATE(_parquet_profile.file_footer_read_calls, _statistics.file_footer_read_calls); - COUNTER_UPDATE(_parquet_profile.file_footer_hit_cache, _statistics.file_footer_hit_cache); + COUNTER_UPDATE(_parquet_profile.filtered_row_groups, _reader_statistics.filtered_row_groups); + COUNTER_UPDATE(_parquet_profile.to_read_row_groups, _reader_statistics.read_row_groups); + COUNTER_UPDATE(_parquet_profile.total_row_groups, _total_groups); + COUNTER_UPDATE(_parquet_profile.filtered_group_rows, _reader_statistics.filtered_group_rows); + COUNTER_UPDATE(_parquet_profile.filtered_page_rows, _reader_statistics.filtered_page_rows); + COUNTER_UPDATE(_parquet_profile.lazy_read_filtered_rows, + _reader_statistics.lazy_read_filtered_rows); + COUNTER_UPDATE(_parquet_profile.filtered_bytes, _reader_statistics.filtered_bytes); + COUNTER_UPDATE(_parquet_profile.raw_rows_read, _reader_statistics.read_rows); + COUNTER_UPDATE(_parquet_profile.column_read_time, _reader_statistics.column_read_time); + COUNTER_UPDATE(_parquet_profile.parse_meta_time, _reader_statistics.parse_meta_time); + COUNTER_UPDATE(_parquet_profile.parse_footer_time, _reader_statistics.parse_footer_time); + COUNTER_UPDATE(_parquet_profile.file_reader_create_time, + _reader_statistics.file_reader_create_time); + COUNTER_UPDATE(_parquet_profile.open_file_num, _reader_statistics.open_file_num); + COUNTER_UPDATE(_parquet_profile.page_index_filter_time, + _reader_statistics.page_index_filter_time); + COUNTER_UPDATE(_parquet_profile.read_page_index_time, _reader_statistics.read_page_index_time); + COUNTER_UPDATE(_parquet_profile.parse_page_index_time, + _reader_statistics.parse_page_index_time); + COUNTER_UPDATE(_parquet_profile.row_group_filter_time, + _reader_statistics.row_group_filter_time); + COUNTER_UPDATE(_parquet_profile.file_footer_read_calls, + _reader_statistics.file_footer_read_calls); + COUNTER_UPDATE(_parquet_profile.file_footer_hit_cache, + _reader_statistics.file_footer_hit_cache); COUNTER_UPDATE(_parquet_profile.skip_page_header_num, _column_statistics.skip_page_header_num); COUNTER_UPDATE(_parquet_profile.parse_page_header_num, _column_statistics.parse_page_header_num); - COUNTER_UPDATE(_parquet_profile.predicate_filter_time, _statistics.predicate_filter_time); - COUNTER_UPDATE(_parquet_profile.dict_filter_rewrite_time, _statistics.dict_filter_rewrite_time); + COUNTER_UPDATE(_parquet_profile.predicate_filter_time, + _reader_statistics.predicate_filter_time); + COUNTER_UPDATE(_parquet_profile.dict_filter_rewrite_time, + _reader_statistics.dict_filter_rewrite_time); COUNTER_UPDATE(_parquet_profile.page_index_read_calls, _column_statistics.page_index_read_calls); COUNTER_UPDATE(_parquet_profile.decompress_time, _column_statistics.decompress_time); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 3d08632102c..6652fe73092 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -74,7 +74,7 @@ class ParquetReader : public GenericReader, public ExprPushDownHelper { ENABLE_FACTORY_CREATOR(ParquetReader); public: - struct Statistics { + struct ReaderStatistics { int32_t filtered_row_groups = 0; int32_t read_row_groups = 0; int64_t filtered_group_rows = 0; @@ -82,13 +82,12 @@ public: int64_t lazy_read_filtered_rows = 0; int64_t read_rows = 0; int64_t filtered_bytes = 0; - int64_t read_bytes = 0; int64_t column_read_time = 0; int64_t parse_meta_time = 0; int64_t parse_footer_time = 0; int64_t file_footer_read_calls = 0; int64_t file_footer_hit_cache = 0; - int64_t open_file_time = 0; + int64_t file_reader_create_time = 0; int64_t open_file_num = 0; int64_t row_group_filter_time = 0; int64_t page_index_filter_time = 0; @@ -141,7 +140,7 @@ public: Status get_parsed_schema(std::vector<std::string>* col_names, std::vector<DataTypePtr>* col_types) override; - Statistics& statistics() { return _statistics; } + ReaderStatistics& reader_statistics() { return _reader_statistics; } const tparquet::FileMetaData* get_meta_data() const { return _t_metadata; } @@ -171,16 +170,16 @@ private: struct ParquetProfile { RuntimeProfile::Counter* filtered_row_groups = nullptr; RuntimeProfile::Counter* to_read_row_groups = nullptr; + RuntimeProfile::Counter* total_row_groups = nullptr; RuntimeProfile::Counter* filtered_group_rows = nullptr; RuntimeProfile::Counter* filtered_page_rows = nullptr; RuntimeProfile::Counter* lazy_read_filtered_rows = nullptr; RuntimeProfile::Counter* filtered_bytes = nullptr; RuntimeProfile::Counter* raw_rows_read = nullptr; - RuntimeProfile::Counter* to_read_bytes = nullptr; RuntimeProfile::Counter* column_read_time = nullptr; RuntimeProfile::Counter* parse_meta_time = nullptr; RuntimeProfile::Counter* parse_footer_time = nullptr; - RuntimeProfile::Counter* open_file_time = nullptr; + RuntimeProfile::Counter* file_reader_create_time = nullptr; RuntimeProfile::Counter* open_file_num = nullptr; RuntimeProfile::Counter* row_group_filter_time = nullptr; RuntimeProfile::Counter* page_index_read_calls = nullptr; @@ -318,8 +317,8 @@ private: // _table_column_names = _missing_cols + _read_table_columns const std::vector<std::string>* _table_column_names = nullptr; - Statistics _statistics; - ParquetColumnReader::Statistics _column_statistics; + ReaderStatistics _reader_statistics; + ParquetColumnReader::ColumnStatistics _column_statistics; ParquetProfile _parquet_profile; bool _closed = false; io::IOContext* _io_ctx = nullptr; diff --git a/be/src/vec/exec/scan/file_scanner.cpp b/be/src/vec/exec/scan/file_scanner.cpp index a2e74b3d547..3cc4ed9d7e5 100644 --- a/be/src/vec/exec/scan/file_scanner.cpp +++ b/be/src/vec/exec/scan/file_scanner.cpp @@ -1757,7 +1757,6 @@ void FileScanner::update_realtime_counters() { _file_cache_statistics->bytes_read_from_local - _last_bytes_read_from_local; int64_t delta_bytes_read_from_remote = _file_cache_statistics->bytes_read_from_remote - _last_bytes_read_from_remote; - if (_file_cache_statistics->bytes_read_from_local == 0 && _file_cache_statistics->bytes_read_from_remote == 0) { _state->get_query_ctx() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
