morningman commented on code in PR #59307:
URL: https://github.com/apache/doris/pull/59307#discussion_r2706700447
##########
be/src/vec/exec/format/parquet/vparquet_page_reader.cpp:
##########
@@ -77,11 +83,79 @@ Status PageReader<IN_COLLECTION,
OFFSET_INDEX>::parse_page_header() {
return Status::IOError("Should skip or load current page to get next
page");
}
+ _page_statistics.page_read_counter += 1;
+
+ // Parse page header from file; header bytes are saved for possible cache
insertion
const uint8_t* page_header_buf = nullptr;
size_t max_size = _end_offset - _offset;
size_t header_size = std::min(INIT_PAGE_HEADER_SIZE, max_size);
const size_t MAX_PAGE_HEADER_SIZE = config::parquet_header_max_size_mb <<
20;
uint32_t real_header_size = 0;
+
+ // Try a header-only lookup in the page cache. Cached pages store
+ // header + optional v2 levels + uncompressed payload, so we can
+ // parse the page header directly from the cached bytes and avoid
+ // a file read for the header.
+ if (_page_read_ctx.enable_parquet_file_page_cache &&
!config::disable_storage_page_cache &&
+ StoragePageCache::instance() != nullptr) {
+ PageCacheHandle handle;
+ StoragePageCache::CacheKey key(fmt::format("{}::{}", _reader->path(),
_reader->mtime()),
+ _end_offset, _offset);
+ if (StoragePageCache::instance()->lookup(key, &handle,
segment_v2::DATA_PAGE)) {
+ // Parse header directly from cached data
+ _page_cache_handle = std::move(handle);
+ Slice s = _page_cache_handle.data();
+ real_header_size = cast_set<uint32_t>(s.size);
+ SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
+ auto st = deserialize_thrift_msg(reinterpret_cast<const
uint8_t*>(s.data),
+ &real_header_size, true,
&_cur_page_header);
+ if (!st.ok()) return st;
+ // Increment page cache counters for a true cache hit on
header+payload
+ _page_statistics.page_cache_hit_counter += 1;
+ // Detect whether the cached payload is compressed or decompressed
and record
+ bool is_cache_payload_decompressed = true;
+ if (_cur_page_header.compressed_page_size > 0) {
Review Comment:
Better extract this check logic:
```
bool should_cache_decompressed(const tparquet::PageHeader* header,
const tparquet::ColumnMetaData& metadata) {
if (header->compressed_page_size <= 0) return true;
if (metadata.codec == tparquet::CompressionCodec::UNCOMPRESSED) return
true;
double ratio = static_cast<double>(header->uncompressed_page_size) /
static_cast<double>(header->compressed_page_size);
return ratio <= config::parquet_page_cache_decompress_threshold;
}
```
And reuse it for both here and in
`be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp`
##########
be/src/io/file_factory.cpp:
##########
@@ -203,6 +203,21 @@ Result<io::FileReaderSPtr> FileFactory::create_file_reader(
const io::FileSystemProperties& system_properties,
const io::FileDescription& file_description, const
io::FileReaderOptions& reader_options,
RuntimeProfile* profile) {
+ auto reader_res = _create_file_reader_internal(system_properties,
file_description,
+ reader_options, profile);
+ if (!reader_res.has_value()) {
+ return unexpected(std::move(reader_res).error());
+ }
+ auto file_reader = std::move(reader_res).value();
+ LOG_INFO("create file reader for path={}, size={}, mtime={}",
file_description.path,
Review Comment:
Remove this log, or using DEBUG level
##########
be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp:
##########
@@ -305,6 +495,32 @@ void ColumnChunkReader<IN_COLLECTION,
OFFSET_INDEX>::_reserve_decompress_buf(siz
}
}
+template <bool IN_COLLECTION, bool OFFSET_INDEX>
+void ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::_insert_page_into_cache(
+ const std::vector<uint8_t>& level_bytes, const Slice& payload) {
+ StoragePageCache::CacheKey key(
+ fmt::format("{}::{}", _stream_reader->path(),
_stream_reader->mtime()),
+ _page_reader->file_end_offset(),
_page_reader->header_start_offset());
+ const std::vector<uint8_t>& header_bytes = _page_reader->header_bytes();
+ size_t total = header_bytes.size() + level_bytes.size() + payload.size;
+ auto* page = new DataPage(total, true, segment_v2::DATA_PAGE);
Review Comment:
Potential memory leak?
Better use a more safety way?
##########
be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp:
##########
@@ -305,6 +495,32 @@ void ColumnChunkReader<IN_COLLECTION,
OFFSET_INDEX>::_reserve_decompress_buf(siz
}
}
+template <bool IN_COLLECTION, bool OFFSET_INDEX>
+void ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::_insert_page_into_cache(
+ const std::vector<uint8_t>& level_bytes, const Slice& payload) {
+ StoragePageCache::CacheKey key(
+ fmt::format("{}::{}", _stream_reader->path(),
_stream_reader->mtime()),
Review Comment:
`fmt::format("{}::{}", _stream_reader->path(), _stream_reader->mtime()`
this part is same for every page, better cache it to reuse:
```
class ParquetPageCacheKeyBuilder {
std::string _file_key_prefix; // Cached once per column chunk
public:
void init(const std::string& path, int64_t mtime) {
_file_key_prefix = fmt::format("{}::{}", path, mtime);
}
StoragePageCache::CacheKey make_key(uint64_t end_offset, uint64_t
offset) const {
return StoragePageCache::CacheKey(_file_key_prefix, end_offset,
offset);
}
};
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]