This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new e8f040fe3c3 branch-4.1: [feature](vparquet-reader) Implements parquet
file page cache. (#59307) (#61477)
e8f040fe3c3 is described below
commit e8f040fe3c3da61c1166712dfe0fc8655f45a3ed
Author: Qi Chen <[email protected]>
AuthorDate: Thu Mar 19 10:52:44 2026 +0800
branch-4.1: [feature](vparquet-reader) Implements parquet file page cache.
(#59307) (#61477)
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
Cherry-pick #59307
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/common/config.cpp | 6 +
be/src/common/config.h | 6 +
be/src/io/cache/cached_remote_file_reader.h | 2 +
be/src/io/file_factory.cpp | 12 +
be/src/io/file_factory.h | 6 +
be/src/io/fs/broker_file_reader.cpp | 6 +-
be/src/io/fs/broker_file_reader.h | 5 +-
be/src/io/fs/broker_file_system.cpp | 2 +-
be/src/io/fs/buffered_reader.h | 12 +
be/src/io/fs/file_reader.h | 3 +
be/src/io/fs/hdfs_file_reader.cpp | 7 +-
be/src/io/fs/hdfs_file_reader.h | 5 +-
be/src/io/fs/http_file_reader.cpp | 7 +-
be/src/io/fs/http_file_reader.h | 5 +-
be/src/io/fs/http_file_system.cpp | 2 +-
be/src/io/fs/local_file_reader.h | 2 +
be/src/io/fs/packed_file_reader.h | 2 +
be/src/io/fs/s3_file_reader.h | 2 +
be/src/io/fs/stream_load_pipe.h | 2 +
be/src/io/fs/tracing_file_reader.h | 2 +
be/src/vec/exec/format/orc/orc_file_reader.h | 2 +
.../parquet/vparquet_column_chunk_reader.cpp | 294 ++++++--
.../format/parquet/vparquet_column_chunk_reader.h | 45 +-
.../exec/format/parquet/vparquet_column_reader.cpp | 31 +-
.../exec/format/parquet/vparquet_column_reader.h | 45 +-
.../exec/format/parquet/vparquet_group_reader.cpp | 2 +-
.../exec/format/parquet/vparquet_page_reader.cpp | 76 ++
.../vec/exec/format/parquet/vparquet_page_reader.h | 96 ++-
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 31 +
be/src/vec/exec/format/parquet/vparquet_reader.h | 8 +
be/test/io/fs/buffered_reader_test.cpp | 6 +
be/test/io/fs/packed_file_concurrency_test.cpp | 2 +
be/test/io/fs/packed_file_reader_test.cpp | 2 +
be/test/io/fs/packed_file_system_test.cpp | 2 +
.../format/file_reader/file_meta_cache_test.cpp | 2 +
.../format/parquet/parquet_page_cache_test.cpp | 804 +++++++++++++++++++++
.../exec/format/parquet/parquet_thrift_test.cpp | 3 +-
be/test/vec/exec/orc/orc_file_reader_test.cpp | 2 +
.../java/org/apache/doris/qe/SessionVariable.java | 12 +
gensrc/thrift/PaloInternalService.thrift | 2 +
40 files changed, 1469 insertions(+), 94 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 372a33dc886..c74d46069b2 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -404,6 +404,12 @@ DEFINE_Int32(index_page_cache_percentage, "10");
DEFINE_mBool(disable_storage_page_cache, "false");
// whether to disable row cache feature in storage
DEFINE_mBool(disable_storage_row_cache, "true");
+// Parquet page cache: threshold ratio for caching decompressed vs compressed
pages
+// If uncompressed_size / compressed_size <= threshold, cache decompressed;
+// otherwise cache compressed if enable_parquet_cache_compressed_pages = true
+DEFINE_Double(parquet_page_cache_decompress_threshold, "1.5");
+// Parquet page cache: whether to enable caching compressed pages (when ratio
exceeds threshold)
+DEFINE_Bool(enable_parquet_cache_compressed_pages, "false");
// whether to disable pk page cache feature in storage
DEFINE_Bool(disable_pk_storage_page_cache, "false");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index f8416557a2e..c7c06f00722 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -472,6 +472,12 @@ DECLARE_Int32(index_page_cache_percentage);
DECLARE_Bool(disable_storage_page_cache);
// whether to disable row cache feature in storage
DECLARE_mBool(disable_storage_row_cache);
+// Parquet page cache: threshold ratio for caching decompressed vs compressed
pages
+// If uncompressed_size / compressed_size <= threshold, cache decompressed;
+// otherwise cache compressed if enable_parquet_cache_compressed_pages = true
+DECLARE_Double(parquet_page_cache_decompress_threshold);
+// Parquet page cache: whether to enable caching compressed pages (when ratio
exceeds threshold)
+DECLARE_Bool(enable_parquet_cache_compressed_pages);
// whether to disable pk page cache feature in storage
DECLARE_Bool(disable_pk_storage_page_cache);
diff --git a/be/src/io/cache/cached_remote_file_reader.h
b/be/src/io/cache/cached_remote_file_reader.h
index 939471b62ea..20c1a47ce88 100644
--- a/be/src/io/cache/cached_remote_file_reader.h
+++ b/be/src/io/cache/cached_remote_file_reader.h
@@ -55,6 +55,8 @@ public:
static std::pair<size_t, size_t> s_align_size(size_t offset, size_t size,
size_t length);
+ int64_t mtime() const override { return _remote_file_reader->mtime(); }
+
protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index bd08bc20461..7b1821c0fa8 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -203,6 +203,18 @@ Result<io::FileReaderSPtr> FileFactory::create_file_reader(
const io::FileSystemProperties& system_properties,
const io::FileDescription& file_description, const
io::FileReaderOptions& reader_options,
RuntimeProfile* profile) {
+ auto reader_res = _create_file_reader_internal(system_properties,
file_description,
+ reader_options, profile);
+ if (!reader_res.has_value()) {
+ return unexpected(std::move(reader_res).error());
+ }
+ return std::move(reader_res).value();
+}
+
+Result<io::FileReaderSPtr> FileFactory::_create_file_reader_internal(
+ const io::FileSystemProperties& system_properties,
+ const io::FileDescription& file_description, const
io::FileReaderOptions& reader_options,
+ RuntimeProfile* profile) {
TFileType::type type = system_properties.system_type;
switch (type) {
case TFileType::FILE_LOCAL: {
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index 0ba791bd0a3..61e322ca0af 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -126,6 +126,12 @@ public:
private:
static std::string _get_fs_name(const io::FileDescription&
file_description);
+
+ /// Create FileReader without FS
+ static Result<io::FileReaderSPtr> _create_file_reader_internal(
+ const io::FileSystemProperties& system_properties,
+ const io::FileDescription& file_description,
+ const io::FileReaderOptions& reader_options, RuntimeProfile*
profile = nullptr);
};
} // namespace doris
diff --git a/be/src/io/fs/broker_file_reader.cpp
b/be/src/io/fs/broker_file_reader.cpp
index 102ea3e2477..41b2992f700 100644
--- a/be/src/io/fs/broker_file_reader.cpp
+++ b/be/src/io/fs/broker_file_reader.cpp
@@ -39,12 +39,14 @@ struct IOContext;
BrokerFileReader::BrokerFileReader(const TNetworkAddress& broker_addr, Path
path, size_t file_size,
TBrokerFD fd,
- std::shared_ptr<BrokerServiceConnection>
connection)
+ std::shared_ptr<BrokerServiceConnection>
connection,
+ int64_t mtime)
: _path(std::move(path)),
_file_size(file_size),
_broker_addr(broker_addr),
_fd(fd),
- _connection(std::move(connection)) {
+ _connection(std::move(connection)),
+ _mtime(mtime) {
DorisMetrics::instance()->broker_file_open_reading->increment(1);
DorisMetrics::instance()->broker_file_reader_total->increment(1);
}
diff --git a/be/src/io/fs/broker_file_reader.h
b/be/src/io/fs/broker_file_reader.h
index 7d19edb32c0..2f6bd94b652 100644
--- a/be/src/io/fs/broker_file_reader.h
+++ b/be/src/io/fs/broker_file_reader.h
@@ -38,7 +38,7 @@ struct IOContext;
class BrokerFileReader final : public FileReader {
public:
BrokerFileReader(const TNetworkAddress& broker_addr, Path path, size_t
file_size, TBrokerFD fd,
- std::shared_ptr<BrokerServiceConnection> connection);
+ std::shared_ptr<BrokerServiceConnection> connection,
int64_t mtime = 0);
~BrokerFileReader() override;
@@ -50,6 +50,8 @@ public:
bool closed() const override { return
_closed.load(std::memory_order_acquire); }
+ int64_t mtime() const override { return _mtime; }
+
protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
@@ -62,6 +64,7 @@ private:
TBrokerFD _fd;
std::shared_ptr<BrokerServiceConnection> _connection;
+ int64_t _mtime;
std::atomic<bool> _closed = false;
};
} // namespace doris::io
diff --git a/be/src/io/fs/broker_file_system.cpp
b/be/src/io/fs/broker_file_system.cpp
index 8b0d5db23e2..b0dc89dc277 100644
--- a/be/src/io/fs/broker_file_system.cpp
+++ b/be/src/io/fs/broker_file_system.cpp
@@ -139,7 +139,7 @@ Status BrokerFileSystem::open_file_internal(const Path&
file, FileReaderSPtr* re
error_msg(response->opStatus.message));
}
*reader = std::make_shared<BrokerFileReader>(_broker_addr, file, fsize,
response->fd,
- _connection);
+ _connection, opts.mtime);
return Status::OK();
}
diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index 5fe07176235..6ddcca02067 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -160,6 +160,8 @@ public:
bool closed() const override { return _closed; }
+ int64_t mtime() const override { return _inner_reader->mtime(); }
+
protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
@@ -329,6 +331,8 @@ public:
bool closed() const override { return _closed; }
+ int64_t mtime() const override { return _reader->mtime(); }
+
// for test only
size_t buffer_remaining() const { return _remaining; }
@@ -532,6 +536,8 @@ public:
bool closed() const override { return _closed; }
+ int64_t mtime() const override { return _reader->mtime(); }
+
void set_random_access_ranges(const std::vector<PrefetchRange>*
random_access_ranges) {
_random_access_ranges = random_access_ranges;
for (auto& _pre_buffer : _pre_buffers) {
@@ -592,6 +598,8 @@ public:
bool closed() const override { return _closed; }
+ int64_t mtime() const override { return _reader->mtime(); }
+
protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
@@ -626,6 +634,8 @@ public:
virtual ~BufferedStreamReader() = default;
// return the file path
virtual std::string path() = 0;
+
+ virtual int64_t mtime() const = 0;
};
class BufferedFileStreamReader : public BufferedStreamReader, public
ProfileCollector {
@@ -639,6 +649,8 @@ public:
Status read_bytes(Slice& slice, uint64_t offset, const IOContext* io_ctx)
override;
std::string path() override { return _file->path(); }
+ int64_t mtime() const override { return _file->mtime(); }
+
protected:
void _collect_profile_before_close() override {
if (_file != nullptr) {
diff --git a/be/src/io/fs/file_reader.h b/be/src/io/fs/file_reader.h
index e6d8527e831..3df912cbad4 100644
--- a/be/src/io/fs/file_reader.h
+++ b/be/src/io/fs/file_reader.h
@@ -90,6 +90,9 @@ public:
virtual const std::string& get_data_dir_path() { return
VIRTUAL_REMOTE_DATA_DIR; }
+ // File modification time (seconds since epoch). Default to 0 meaning
unknown.
+ virtual int64_t mtime() const = 0;
+
protected:
virtual Status read_at_impl(size_t offset, Slice result, size_t*
bytes_read,
const IOContext* io_ctx) = 0;
diff --git a/be/src/io/fs/hdfs_file_reader.cpp
b/be/src/io/fs/hdfs_file_reader.cpp
index 0e278dff0c8..b1d65a63ba0 100644
--- a/be/src/io/fs/hdfs_file_reader.cpp
+++ b/be/src/io/fs/hdfs_file_reader.cpp
@@ -66,16 +66,17 @@ Result<FileReaderSPtr> HdfsFileReader::create(Path
full_path, const hdfsFS& fs,
auto path = convert_path(full_path, fs_name);
return get_file(fs, path, opts.mtime, opts.file_size).transform([&](auto&&
accessor) {
return std::make_shared<HdfsFileReader>(std::move(path),
std::move(fs_name),
- std::move(accessor), profile);
+ std::move(accessor), profile,
opts.mtime);
});
}
HdfsFileReader::HdfsFileReader(Path path, std::string fs_name,
FileHandleCache::Accessor accessor,
- RuntimeProfile* profile)
+ RuntimeProfile* profile, int64_t mtime)
: _path(std::move(path)),
_fs_name(std::move(fs_name)),
_accessor(std::move(accessor)),
- _profile(profile) {
+ _profile(profile),
+ _mtime(mtime) {
_handle = _accessor.get();
DorisMetrics::instance()->hdfs_file_open_reading->increment(1);
diff --git a/be/src/io/fs/hdfs_file_reader.h b/be/src/io/fs/hdfs_file_reader.h
index 8556eea0de6..08f98bca29a 100644
--- a/be/src/io/fs/hdfs_file_reader.h
+++ b/be/src/io/fs/hdfs_file_reader.h
@@ -45,7 +45,7 @@ public:
const FileReaderOptions& opts,
RuntimeProfile* profile);
HdfsFileReader(Path path, std::string fs_name, FileHandleCache::Accessor
accessor,
- RuntimeProfile* profile);
+ RuntimeProfile* profile, int64_t mtime = 0);
~HdfsFileReader() override;
@@ -57,6 +57,8 @@ public:
bool closed() const override { return
_closed.load(std::memory_order_acquire); }
+ int64_t mtime() const override { return _mtime; }
+
protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
@@ -86,6 +88,7 @@ private:
CachedHdfsFileHandle* _handle = nullptr; // owned by _cached_file_handle
std::atomic<bool> _closed = false;
RuntimeProfile* _profile = nullptr;
+ int64_t _mtime;
#ifdef USE_HADOOP_HDFS
HDFSProfile _hdfs_profile;
#endif
diff --git a/be/src/io/fs/http_file_reader.cpp
b/be/src/io/fs/http_file_reader.cpp
index fb243179baf..5ad984039fc 100644
--- a/be/src/io/fs/http_file_reader.cpp
+++ b/be/src/io/fs/http_file_reader.cpp
@@ -34,7 +34,7 @@ Result<FileReaderSPtr> HttpFileReader::create(const
std::string& url,
ofi.path = Path(url);
ofi.extend_info = props;
- auto reader = std::make_shared<HttpFileReader>(ofi, url);
+ auto reader = std::make_shared<HttpFileReader>(ofi, url, opts.mtime);
// Open the file to detect Range support and validate configuration
RETURN_IF_ERROR_RESULT(reader->open(opts));
@@ -42,11 +42,12 @@ Result<FileReaderSPtr> HttpFileReader::create(const
std::string& url,
return reader;
}
-HttpFileReader::HttpFileReader(const OpenFileInfo& fileInfo, std::string url)
+HttpFileReader::HttpFileReader(const OpenFileInfo& fileInfo, std::string url,
int64_t mtime)
: _extend_kv(fileInfo.extend_info),
_path(fileInfo.path),
_url(std::move(url)),
- _client(std::make_unique<HttpClient>()) {
+ _client(std::make_unique<HttpClient>()),
+ _mtime(mtime) {
auto etag_iter = _extend_kv.find("etag");
if (etag_iter != _extend_kv.end()) {
_etag = etag_iter->second;
diff --git a/be/src/io/fs/http_file_reader.h b/be/src/io/fs/http_file_reader.h
index 607eedf3d1a..982e65905aa 100644
--- a/be/src/io/fs/http_file_reader.h
+++ b/be/src/io/fs/http_file_reader.h
@@ -41,7 +41,7 @@ public:
const std::map<std::string,
std::string>& props,
const FileReaderOptions& opts,
RuntimeProfile* profile);
- explicit HttpFileReader(const OpenFileInfo& fileInfo, std::string url);
+ explicit HttpFileReader(const OpenFileInfo& fileInfo, std::string url,
int64_t mtime);
~HttpFileReader() override;
Status open(const FileReaderOptions& opts);
@@ -52,6 +52,8 @@ public:
bool closed() const override { return
_closed.load(std::memory_order_acquire); }
size_t size() const override { return _file_size; }
+ int64_t mtime() const override { return _mtime; }
+
private:
// Prepare and initialize the HTTP client for a new request
Status prepare_client(bool set_fail_on_error = true);
@@ -78,6 +80,7 @@ private:
int64_t _last_modified = 0;
std::atomic<bool> _closed = false;
std::unique_ptr<HttpClient> _client;
+ int64_t _mtime;
// Configuration for non-Range request handling
bool _enable_range_request = true; // Whether
Range request is required
diff --git a/be/src/io/fs/http_file_system.cpp
b/be/src/io/fs/http_file_system.cpp
index 92e175ca774..b1e8de354ad 100644
--- a/be/src/io/fs/http_file_system.cpp
+++ b/be/src/io/fs/http_file_system.cpp
@@ -56,7 +56,7 @@ Status HttpFileSystem::open_file_internal(const Path& path,
FileReaderSPtr* read
// Pass properties (including HTTP headers) to the file reader
file_info.extend_info = _properties;
- auto http_reader = std::make_shared<HttpFileReader>(file_info,
path.native());
+ auto http_reader = std::make_shared<HttpFileReader>(file_info,
path.native(), opts.mtime);
RETURN_IF_ERROR(http_reader->open(opts));
*reader = http_reader;
return Status::OK();
diff --git a/be/src/io/fs/local_file_reader.h b/be/src/io/fs/local_file_reader.h
index 38b4cfa55af..e11bb152b67 100644
--- a/be/src/io/fs/local_file_reader.h
+++ b/be/src/io/fs/local_file_reader.h
@@ -63,6 +63,8 @@ public:
const std::string& get_data_dir_path() override { return _data_dir_path; }
+ int64_t mtime() const override { return 0; }
+
private:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
diff --git a/be/src/io/fs/packed_file_reader.h
b/be/src/io/fs/packed_file_reader.h
index 79cd23c8cd7..b6b423fbbfe 100644
--- a/be/src/io/fs/packed_file_reader.h
+++ b/be/src/io/fs/packed_file_reader.h
@@ -48,6 +48,8 @@ public:
bool closed() const override { return _closed; }
+ int64_t mtime() const override { return _inner_reader->mtime(); }
+
protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
diff --git a/be/src/io/fs/s3_file_reader.h b/be/src/io/fs/s3_file_reader.h
index 58294ec1891..40e3ac61d3a 100644
--- a/be/src/io/fs/s3_file_reader.h
+++ b/be/src/io/fs/s3_file_reader.h
@@ -53,6 +53,8 @@ public:
bool closed() const override { return
_closed.load(std::memory_order_acquire); }
+ int64_t mtime() const override { return 0; }
+
protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
diff --git a/be/src/io/fs/stream_load_pipe.h b/be/src/io/fs/stream_load_pipe.h
index cedab0b6c17..df137a9267c 100644
--- a/be/src/io/fs/stream_load_pipe.h
+++ b/be/src/io/fs/stream_load_pipe.h
@@ -57,6 +57,8 @@ public:
size_t size() const override { return 0; }
+ int64_t mtime() const override { return 0; }
+
// called when consumer finished
Status close() override {
if (!(_finished || _cancelled)) {
diff --git a/be/src/io/fs/tracing_file_reader.h
b/be/src/io/fs/tracing_file_reader.h
index 39b70dfbb63..7a6651afd21 100644
--- a/be/src/io/fs/tracing_file_reader.h
+++ b/be/src/io/fs/tracing_file_reader.h
@@ -47,6 +47,8 @@ public:
void _collect_profile_at_runtime() override { return
_inner->collect_profile_at_runtime(); }
void _collect_profile_before_close() override { return
_inner->collect_profile_before_close(); }
+ int64_t mtime() const override { return _inner->mtime(); }
+
FileReaderStats* stats() const { return _stats; }
doris::io::FileReaderSPtr inner_reader() { return _inner; }
diff --git a/be/src/vec/exec/format/orc/orc_file_reader.h
b/be/src/vec/exec/format/orc/orc_file_reader.h
index 503777e67c2..15aeed33242 100644
--- a/be/src/vec/exec/format/orc/orc_file_reader.h
+++ b/be/src/vec/exec/format/orc/orc_file_reader.h
@@ -54,6 +54,8 @@ public:
bool closed() const override { return _closed; }
+ int64_t mtime() const override { return _inner_reader->mtime(); }
+
// for test only
const Statistics& statistics() const { return _statistics; }
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
index e7f5f94d32f..7e228e58dff 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
@@ -22,9 +22,12 @@
#include <string.h>
#include <cstdint>
+#include <memory>
#include <utility>
#include "common/compiler_util.h" // IWYU pragma: keep
+#include "io/fs/buffered_reader.h"
+#include "olap/page_cache.h"
#include "util/bit_util.h"
#include "util/block_compression.h"
#include "util/runtime_profile.h"
@@ -52,7 +55,7 @@ template <bool IN_COLLECTION, bool OFFSET_INDEX>
ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::ColumnChunkReader(
io::BufferedStreamReader* reader, tparquet::ColumnChunk* column_chunk,
FieldSchema* field_schema, const tparquet::OffsetIndex* offset_index,
size_t total_rows,
- io::IOContext* io_ctx)
+ io::IOContext* io_ctx, const ParquetPageReadContext& page_read_ctx)
: _field_schema(field_schema),
_max_rep_level(field_schema->repetition_level),
_max_def_level(field_schema->definition_level),
@@ -60,7 +63,8 @@ ColumnChunkReader<IN_COLLECTION,
OFFSET_INDEX>::ColumnChunkReader(
_metadata(column_chunk->meta_data),
_offset_index(offset_index),
_total_rows(total_rows),
- _io_ctx(io_ctx) {}
+ _io_ctx(io_ctx),
+ _page_read_ctx(page_read_ctx) {}
template <bool IN_COLLECTION, bool OFFSET_INDEX>
Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::init() {
@@ -69,7 +73,8 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::init()
{
size_t chunk_size = _metadata.total_compressed_size;
// create page reader
_page_reader = create_page_reader<IN_COLLECTION, OFFSET_INDEX>(
- _stream_reader, _io_ctx, start_offset, chunk_size, _total_rows,
_offset_index);
+ _stream_reader, _io_ctx, start_offset, chunk_size, _total_rows,
_metadata,
+ _page_read_ctx, _offset_index);
// get the block compression codec
RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec,
&_block_compress_codec));
_state = INITIALIZED;
@@ -104,7 +109,7 @@ Status ColumnChunkReader<IN_COLLECTION,
OFFSET_INDEX>::_parse_first_page_header(
RETURN_IF_ERROR(parse_page_header());
const tparquet::PageHeader* header = nullptr;
- RETURN_IF_ERROR(_page_reader->get_page_header(header));
+ RETURN_IF_ERROR(_page_reader->get_page_header(&header));
if (header->type == tparquet::PageType::DICTIONARY_PAGE) {
// the first page maybe directory page even if
_metadata.__isset.dictionary_page_offset == false,
// so we should parse the directory page in next_page()
@@ -125,8 +130,7 @@ Status ColumnChunkReader<IN_COLLECTION,
OFFSET_INDEX>::parse_page_header() {
RETURN_IF_ERROR(_page_reader->parse_page_header());
const tparquet::PageHeader* header = nullptr;
- ;
- RETURN_IF_ERROR(_page_reader->get_page_header(header));
+ RETURN_IF_ERROR(_page_reader->get_page_header(&header));
int32_t page_num_values = _page_reader->is_header_v2() ?
header->data_page_header_v2.num_values
:
header->data_page_header.num_values;
_remaining_rep_nums = page_num_values;
@@ -169,37 +173,144 @@ Status ColumnChunkReader<IN_COLLECTION,
OFFSET_INDEX>::load_page_data() {
}
const tparquet::PageHeader* header = nullptr;
- RETURN_IF_ERROR(_page_reader->get_page_header(header));
+ RETURN_IF_ERROR(_page_reader->get_page_header(&header));
int32_t uncompressed_size = header->uncompressed_page_size;
+ bool page_loaded = false;
+
+ // First, try to reuse a cache handle previously discovered by PageReader
+ // (header-only lookup) to avoid a second lookup here.
+ if (_page_read_ctx.enable_parquet_file_page_cache &&
!config::disable_storage_page_cache &&
+ StoragePageCache::instance() != nullptr) {
+ if (_page_reader->has_page_cache_handle()) {
+ const PageCacheHandle& handle = _page_reader->page_cache_handle();
+ Slice cached = handle.data();
+ size_t header_size = _page_reader->header_bytes().size();
+ size_t levels_size = 0;
+ if (header->__isset.data_page_header_v2) {
+ const tparquet::DataPageHeaderV2& header_v2 =
header->data_page_header_v2;
+ size_t rl = header_v2.repetition_levels_byte_length;
+ size_t dl = header_v2.definition_levels_byte_length;
+ levels_size = rl + dl;
+ _v2_rep_levels =
+ Slice(reinterpret_cast<const uint8_t*>(cached.data) +
header_size, rl);
+ _v2_def_levels =
+ Slice(reinterpret_cast<const uint8_t*>(cached.data) +
header_size + rl, dl);
+ }
+ // payload_slice points to the bytes after header and levels
+ Slice payload_slice(cached.data + header_size + levels_size,
+ cached.size - header_size - levels_size);
- if (_block_compress_codec != nullptr) {
- Slice compressed_data;
- RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data));
- if (header->__isset.data_page_header_v2) {
- const tparquet::DataPageHeaderV2& header_v2 =
header->data_page_header_v2;
- // uncompressed_size = rl + dl + uncompressed_data_size
- // compressed_size = rl + dl + compressed_data_size
- uncompressed_size -= header_v2.repetition_levels_byte_length +
- header_v2.definition_levels_byte_length;
- _get_uncompressed_levels(header_v2, compressed_data);
+ bool cache_payload_is_decompressed =
_page_reader->is_cache_payload_decompressed();
+
+ if (cache_payload_is_decompressed) {
+ // Cached payload is already uncompressed
+ _page_data = payload_slice;
+ } else {
+ CHECK(_block_compress_codec);
+ // Decompress cached payload into _decompress_buf for decoding
+ size_t uncompressed_payload_size =
+ header->__isset.data_page_header_v2
+ ?
static_cast<size_t>(header->uncompressed_page_size) - levels_size
+ :
static_cast<size_t>(header->uncompressed_page_size);
+ _reserve_decompress_buf(uncompressed_payload_size);
+ _page_data = Slice(_decompress_buf.get(),
uncompressed_payload_size);
+ SCOPED_RAW_TIMER(&_chunk_statistics.decompress_time);
+ _chunk_statistics.decompress_cnt++;
+
RETURN_IF_ERROR(_block_compress_codec->decompress(payload_slice, &_page_data));
+ }
+ // page cache counters were incremented when PageReader did the
header-only
+ // cache lookup. Do not increment again to avoid double-counting.
+ page_loaded = true;
}
- bool is_v2_compressed =
- header->__isset.data_page_header_v2 &&
header->data_page_header_v2.is_compressed;
- if (header->__isset.data_page_header || is_v2_compressed) {
- // check decompressed buffer size
- _reserve_decompress_buf(uncompressed_size);
- _page_data = Slice(_decompress_buf.get(), uncompressed_size);
- SCOPED_RAW_TIMER(&_chunk_statistics.decompress_time);
- _chunk_statistics.decompress_cnt++;
- RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data,
&_page_data));
+ }
+
+ if (!page_loaded) {
+ if (_block_compress_codec != nullptr) {
+ Slice compressed_data;
+ RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data));
+ std::vector<uint8_t> level_bytes;
+ if (header->__isset.data_page_header_v2) {
+ const tparquet::DataPageHeaderV2& header_v2 =
header->data_page_header_v2;
+ // uncompressed_size = rl + dl + uncompressed_data_size
+ // compressed_size = rl + dl + compressed_data_size
+ uncompressed_size -= header_v2.repetition_levels_byte_length +
+ header_v2.definition_levels_byte_length;
+ // copy level bytes (rl + dl) so that we can cache header +
levels + uncompressed payload
+ size_t rl = header_v2.repetition_levels_byte_length;
+ size_t dl = header_v2.definition_levels_byte_length;
+ size_t level_sz = rl + dl;
+ if (level_sz > 0) {
+ level_bytes.resize(level_sz);
+ memcpy(level_bytes.data(), compressed_data.data, level_sz);
+ }
+ // now remove levels from compressed_data for decompression
+ _get_uncompressed_levels(header_v2, compressed_data);
+ }
+ bool is_v2_compressed = header->__isset.data_page_header_v2 &&
+ header->data_page_header_v2.is_compressed;
+ bool page_has_compression = header->__isset.data_page_header ||
is_v2_compressed;
+
+ if (page_has_compression) {
+ // Decompress payload for immediate decoding
+ _reserve_decompress_buf(uncompressed_size);
+ _page_data = Slice(_decompress_buf.get(), uncompressed_size);
+ SCOPED_RAW_TIMER(&_chunk_statistics.decompress_time);
+ _chunk_statistics.decompress_cnt++;
+
RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data,
&_page_data));
+
+ // Decide whether to cache decompressed payload or compressed
payload based on threshold
+ bool cache_payload_decompressed =
should_cache_decompressed(header, _metadata);
+
+ if (_page_read_ctx.enable_parquet_file_page_cache &&
+ !config::disable_storage_page_cache &&
+ StoragePageCache::instance() != nullptr &&
+ !_page_reader->header_bytes().empty()) {
+ if (cache_payload_decompressed) {
+ _insert_page_into_cache(level_bytes, _page_data);
+
_chunk_statistics.page_cache_decompressed_write_counter += 1;
+ } else {
+ if (config::enable_parquet_cache_compressed_pages) {
+ // cache the compressed payload as-is (header |
levels | compressed_payload)
+ _insert_page_into_cache(
+ level_bytes, Slice(compressed_data.data,
compressed_data.size));
+
_chunk_statistics.page_cache_compressed_write_counter += 1;
+ }
+ }
+ }
+ } else {
+ // no compression on this page, use the data directly
+ _page_data = Slice(compressed_data.data, compressed_data.size);
+ if (_page_read_ctx.enable_parquet_file_page_cache &&
+ !config::disable_storage_page_cache &&
+ StoragePageCache::instance() != nullptr) {
+ _insert_page_into_cache(level_bytes, _page_data);
+ _chunk_statistics.page_cache_decompressed_write_counter +=
1;
+ }
+ }
} else {
- // Don't need decompress
- _page_data = Slice(compressed_data.data, compressed_data.size);
- }
- } else {
- RETURN_IF_ERROR(_page_reader->get_page_data(_page_data));
- if (header->__isset.data_page_header_v2) {
- _get_uncompressed_levels(header->data_page_header_v2, _page_data);
+ // For uncompressed page, we may still need to extract v2 levels
+ std::vector<uint8_t> level_bytes;
+ Slice uncompressed_data;
+ RETURN_IF_ERROR(_page_reader->get_page_data(uncompressed_data));
+ if (header->__isset.data_page_header_v2) {
+ const tparquet::DataPageHeaderV2& header_v2 =
header->data_page_header_v2;
+ size_t rl = header_v2.repetition_levels_byte_length;
+ size_t dl = header_v2.definition_levels_byte_length;
+ size_t level_sz = rl + dl;
+ if (level_sz > 0) {
+ level_bytes.resize(level_sz);
+ memcpy(level_bytes.data(), uncompressed_data.data,
level_sz);
+ }
+ _get_uncompressed_levels(header_v2, uncompressed_data);
+ }
+ // copy page data out
+ _page_data = Slice(uncompressed_data.data, uncompressed_data.size);
+ // Optionally cache uncompressed data for uncompressed pages
+ if (_page_read_ctx.enable_parquet_file_page_cache &&
+ !config::disable_storage_page_cache &&
StoragePageCache::instance() != nullptr) {
+ _insert_page_into_cache(level_bytes, _page_data);
+ _chunk_statistics.page_cache_decompressed_write_counter += 1;
+ }
}
}
@@ -244,7 +355,6 @@ Status ColumnChunkReader<IN_COLLECTION,
OFFSET_INDEX>::load_page_data() {
_decoders[static_cast<int>(encoding)] = std::move(page_decoder);
_page_decoder = _decoders[static_cast<int>(encoding)].get();
}
- // Reset page data for each page
RETURN_IF_ERROR(_page_decoder->set_data(&_page_data));
_state = DATA_LOADED;
@@ -254,7 +364,7 @@ Status ColumnChunkReader<IN_COLLECTION,
OFFSET_INDEX>::load_page_data() {
template <bool IN_COLLECTION, bool OFFSET_INDEX>
Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::_decode_dict_page() {
const tparquet::PageHeader* header = nullptr;
- RETURN_IF_ERROR(_page_reader->get_page_header(header));
+ RETURN_IF_ERROR(_page_reader->get_page_header(&header));
DCHECK_EQ(tparquet::PageType::DICTIONARY_PAGE, header->type);
SCOPED_RAW_TIMER(&_chunk_statistics.decode_dict_time);
@@ -271,16 +381,84 @@ Status ColumnChunkReader<IN_COLLECTION,
OFFSET_INDEX>::_decode_dict_page() {
// Prepare dictionary data
int32_t uncompressed_size = header->uncompressed_page_size;
auto dict_data = make_unique_buffer<uint8_t>(uncompressed_size);
- if (_block_compress_codec != nullptr) {
- Slice compressed_data;
- RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data));
- Slice dict_slice(dict_data.get(), uncompressed_size);
- RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data,
&dict_slice));
- } else {
- Slice dict_slice;
- RETURN_IF_ERROR(_page_reader->get_page_data(dict_slice));
- // The data is stored by BufferedStreamReader, we should copy it out
- memcpy(dict_data.get(), dict_slice.data, dict_slice.size);
+ bool dict_loaded = false;
+
+ // Try to load dictionary page from cache
+ if (_page_read_ctx.enable_parquet_file_page_cache &&
!config::disable_storage_page_cache &&
+ StoragePageCache::instance() != nullptr) {
+ if (_page_reader->has_page_cache_handle()) {
+ const PageCacheHandle& handle = _page_reader->page_cache_handle();
+ Slice cached = handle.data();
+ size_t header_size = _page_reader->header_bytes().size();
+ // Dictionary page layout in cache: header | payload (compressed
or uncompressed)
+ Slice payload_slice(cached.data + header_size, cached.size -
header_size);
+
+ bool cache_payload_is_decompressed =
_page_reader->is_cache_payload_decompressed();
+
+ if (cache_payload_is_decompressed) {
+ // Use cached decompressed dictionary data
+ memcpy(dict_data.get(), payload_slice.data,
payload_slice.size);
+ dict_loaded = true;
+ } else {
+ CHECK(_block_compress_codec);
+ // Decompress cached compressed dictionary data
+ Slice dict_slice(dict_data.get(), uncompressed_size);
+
RETURN_IF_ERROR(_block_compress_codec->decompress(payload_slice, &dict_slice));
+ dict_loaded = true;
+ }
+
+ // When dictionary page is loaded from cache, we need to skip the
page data
+ // to update the offset correctly (similar to calling
get_page_data())
+ if (dict_loaded) {
+ _page_reader->skip_page_data();
+ }
+ }
+ }
+
+ if (!dict_loaded) {
+ // Load and decompress dictionary page from file
+ if (_block_compress_codec != nullptr) {
+ Slice compressed_data;
+ RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data));
+ Slice dict_slice(dict_data.get(), uncompressed_size);
+ RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data,
&dict_slice));
+
+ // Decide whether to cache decompressed or compressed dictionary
based on threshold
+ bool cache_payload_decompressed =
should_cache_decompressed(header, _metadata);
+
+ if (_page_read_ctx.enable_parquet_file_page_cache &&
+ !config::disable_storage_page_cache &&
StoragePageCache::instance() != nullptr &&
+ !_page_reader->header_bytes().empty()) {
+ std::vector<uint8_t> empty_levels; // Dictionary pages don't
have levels
+ if (cache_payload_decompressed) {
+ // Cache the decompressed dictionary page
+ _insert_page_into_cache(empty_levels, dict_slice);
+ _chunk_statistics.page_cache_decompressed_write_counter +=
1;
+ } else {
+ if (config::enable_parquet_cache_compressed_pages) {
+ // Cache the compressed dictionary page
+ _insert_page_into_cache(empty_levels,
+ Slice(compressed_data.data,
compressed_data.size));
+ _chunk_statistics.page_cache_compressed_write_counter
+= 1;
+ }
+ }
+ }
+ } else {
+ Slice dict_slice;
+ RETURN_IF_ERROR(_page_reader->get_page_data(dict_slice));
+ // The data is stored by BufferedStreamReader, we should copy it
out
+ memcpy(dict_data.get(), dict_slice.data, dict_slice.size);
+
+ // Cache the uncompressed dictionary page
+ if (_page_read_ctx.enable_parquet_file_page_cache &&
+ !config::disable_storage_page_cache &&
StoragePageCache::instance() != nullptr &&
+ !_page_reader->header_bytes().empty()) {
+ std::vector<uint8_t> empty_levels;
+ Slice payload(dict_data.get(), uncompressed_size);
+ _insert_page_into_cache(empty_levels, payload);
+ _chunk_statistics.page_cache_decompressed_write_counter += 1;
+ }
+ }
}
// Cache page decoder
@@ -306,6 +484,32 @@ void ColumnChunkReader<IN_COLLECTION,
OFFSET_INDEX>::_reserve_decompress_buf(siz
}
}
+template <bool IN_COLLECTION, bool OFFSET_INDEX>
+void ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::_insert_page_into_cache(
+ const std::vector<uint8_t>& level_bytes, const Slice& payload) {
+ StoragePageCache::CacheKey key =
+
_page_reader->make_page_cache_key(_page_reader->header_start_offset());
+ const std::vector<uint8_t>& header_bytes = _page_reader->header_bytes();
+ size_t total = header_bytes.size() + level_bytes.size() + payload.size;
+ auto page = std::make_unique<DataPage>(total, true, segment_v2::DATA_PAGE);
+ size_t pos = 0;
+ memcpy(page->data() + pos, header_bytes.data(), header_bytes.size());
+ pos += header_bytes.size();
+ if (!level_bytes.empty()) {
+ memcpy(page->data() + pos, level_bytes.data(), level_bytes.size());
+ pos += level_bytes.size();
+ }
+ if (payload.size > 0) {
+ memcpy(page->data() + pos, payload.data, payload.size);
+ pos += payload.size;
+ }
+ page->reset_size(total);
+ PageCacheHandle handle;
+ StoragePageCache::instance()->insert(key, page.get(), &handle,
segment_v2::DATA_PAGE);
+ page.release();
+ _chunk_statistics.page_cache_write_counter += 1;
+}
+
template <bool IN_COLLECTION, bool OFFSET_INDEX>
Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::skip_values(size_t
num_values,
bool
skip_data) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
index 9e77a3139f6..d0bf7ab2d81 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
@@ -61,20 +61,19 @@ struct ColumnChunkReaderStatistics {
int64_t skip_page_header_num = 0;
int64_t parse_page_header_num = 0;
int64_t read_page_header_time = 0;
+ int64_t page_read_counter = 0;
+ int64_t page_cache_write_counter = 0;
+ int64_t page_cache_compressed_write_counter = 0;
+ int64_t page_cache_decompressed_write_counter = 0;
+ int64_t page_cache_hit_counter = 0;
+ int64_t page_cache_missing_counter = 0;
+ int64_t page_cache_compressed_hit_counter = 0;
+ int64_t page_cache_decompressed_hit_counter = 0;
};
/**
* Read and decode parquet column data into doris block column.
- * <p>Usage:</p> struct ColumnChunkReaderStatistics {
- int64_t decompress_time = 0;
- int64_t decompress_cnt = 0;
- int64_t decode_header_time = 0;
- int64_t decode_value_time = 0;
- int64_t decode_dict_time = 0;
- int64_t decode_level_time = 0;
- int64_t skip_page_header_num = 0;
- int64_t parse_page_header_num = 0;
- };
+ * <p>Usage:</p>
* // Create chunk reader
* ColumnChunkReader chunk_reader(BufferedStreamReader* reader,
* tparquet::ColumnChunk* column_chunk,
@@ -97,7 +96,8 @@ class ColumnChunkReader {
public:
ColumnChunkReader(io::BufferedStreamReader* reader, tparquet::ColumnChunk*
column_chunk,
FieldSchema* field_schema, const tparquet::OffsetIndex*
offset_index,
- size_t total_row, io::IOContext* io_ctx);
+ size_t total_row, io::IOContext* io_ctx,
+ const ParquetPageReadContext& page_read_ctx);
~ColumnChunkReader() = default;
// Initialize chunk reader, will generate the decoder and codec.
@@ -155,6 +155,21 @@ public:
_page_reader->page_statistics().parse_page_header_num;
_chunk_statistics.read_page_header_time =
_page_reader->page_statistics().read_page_header_time;
+ _chunk_statistics.page_read_counter +=
_page_reader->page_statistics().page_read_counter;
+ _chunk_statistics.page_cache_write_counter +=
+ _page_reader->page_statistics().page_cache_write_counter;
+ _chunk_statistics.page_cache_compressed_write_counter +=
+
_page_reader->page_statistics().page_cache_compressed_write_counter;
+ _chunk_statistics.page_cache_decompressed_write_counter +=
+
_page_reader->page_statistics().page_cache_decompressed_write_counter;
+ _chunk_statistics.page_cache_hit_counter +=
+ _page_reader->page_statistics().page_cache_hit_counter;
+ _chunk_statistics.page_cache_missing_counter +=
+ _page_reader->page_statistics().page_cache_missing_counter;
+ _chunk_statistics.page_cache_compressed_hit_counter +=
+
_page_reader->page_statistics().page_cache_compressed_hit_counter;
+ _chunk_statistics.page_cache_decompressed_hit_counter +=
+
_page_reader->page_statistics().page_cache_decompressed_hit_counter;
return _chunk_statistics;
}
@@ -193,6 +208,11 @@ public:
size_t* result_rows, bool* cross_page);
Status load_cross_page_nested_row(std::vector<level_t>& rep_levels, bool*
cross_page);
+ Slice get_page_data() const { return _page_data; }
+ const Slice& v2_rep_levels() const { return _v2_rep_levels; }
+ const Slice& v2_def_levels() const { return _v2_def_levels; }
+ ColumnChunkReaderStatistics& statistics() { return chunk_statistics(); }
+
private:
enum ColumnChunkReaderState { NOT_INIT, INITIALIZED, HEADER_PARSED,
DATA_LOADED, PAGE_SKIPPED };
@@ -202,6 +222,7 @@ private:
void _reserve_decompress_buf(size_t size);
int32_t _get_type_length();
+ void _insert_page_into_cache(const std::vector<uint8_t>& level_bytes,
const Slice& payload);
void _get_uncompressed_levels(const tparquet::DataPageHeaderV2& page_v2,
Slice& page_data);
Status _skip_nested_rows_in_page(size_t num_rows);
@@ -234,6 +255,8 @@ private:
std::unique_ptr<PageReader<IN_COLLECTION, OFFSET_INDEX>> _page_reader;
BlockCompressionCodec* _block_compress_codec = nullptr;
+ ParquetPageReadContext _page_read_ctx;
+
LevelDecoder _rep_level_decoder;
LevelDecoder _def_level_decoder;
size_t _chunk_parsed_values = 0;
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index 30962632662..656e599a962 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -110,13 +110,14 @@ Status ParquetColumnReader::create(io::FileReaderSPtr
file, FieldSchema* field,
std::unique_ptr<ParquetColumnReader>&
reader,
size_t max_buf_size,
std::unordered_map<int,
tparquet::OffsetIndex>& col_offsets,
- bool in_collection, const
std::set<uint64_t>& column_ids,
+ RuntimeState* state, bool in_collection,
+ const std::set<uint64_t>& column_ids,
const std::set<uint64_t>&
filter_column_ids) {
size_t total_rows = row_group.num_rows;
if (field->data_type->get_primitive_type() == TYPE_ARRAY) {
std::unique_ptr<ParquetColumnReader> element_reader;
RETURN_IF_ERROR(create(file, &field->children[0], row_group,
row_ranges, ctz, io_ctx,
- element_reader, max_buf_size, col_offsets,
true, column_ids,
+ element_reader, max_buf_size, col_offsets,
state, true, column_ids,
filter_column_ids));
auto array_reader = ArrayColumnReader::create_unique(row_ranges,
total_rows, ctz, io_ctx);
element_reader->set_column_in_nested();
@@ -130,7 +131,7 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file,
FieldSchema* field,
column_ids.find(field->children[0].get_column_id()) !=
column_ids.end()) {
// Create key reader
RETURN_IF_ERROR(create(file, &field->children[0], row_group,
row_ranges, ctz, io_ctx,
- key_reader, max_buf_size, col_offsets,
true, column_ids,
+ key_reader, max_buf_size, col_offsets,
state, true, column_ids,
filter_column_ids));
} else {
auto skip_reader = std::make_unique<SkipReadingReader>(row_ranges,
total_rows, ctz,
@@ -142,7 +143,7 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file,
FieldSchema* field,
column_ids.find(field->children[1].get_column_id()) !=
column_ids.end()) {
// Create value reader
RETURN_IF_ERROR(create(file, &field->children[1], row_group,
row_ranges, ctz, io_ctx,
- value_reader, max_buf_size, col_offsets,
true, column_ids,
+ value_reader, max_buf_size, col_offsets,
state, true, column_ids,
filter_column_ids));
} else {
auto skip_reader = std::make_unique<SkipReadingReader>(row_ranges,
total_rows, ctz,
@@ -165,8 +166,8 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file,
FieldSchema* field,
std::unique_ptr<ParquetColumnReader> child_reader;
if (column_ids.empty() || column_ids.find(child.get_column_id())
!= column_ids.end()) {
RETURN_IF_ERROR(create(file, &child, row_group, row_ranges,
ctz, io_ctx,
- child_reader, max_buf_size,
col_offsets, in_collection,
- column_ids, filter_column_ids));
+ child_reader, max_buf_size,
col_offsets, state,
+ in_collection, column_ids,
filter_column_ids));
child_readers[child.name] = std::move(child_reader);
// Record the first non-SkippingReader
if (non_skip_reader_idx == -1) {
@@ -184,7 +185,7 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file,
FieldSchema* field,
if (non_skip_reader_idx == -1) {
std::unique_ptr<ParquetColumnReader> child_reader;
RETURN_IF_ERROR(create(file, &field->children[0], row_group,
row_ranges, ctz, io_ctx,
- child_reader, max_buf_size, col_offsets,
in_collection,
+ child_reader, max_buf_size, col_offsets,
state, in_collection,
column_ids, filter_column_ids));
child_reader->set_column_in_nested();
child_readers[field->children[0].name] = std::move(child_reader);
@@ -205,14 +206,14 @@ Status ParquetColumnReader::create(io::FileReaderSPtr
file, FieldSchema* field,
auto scalar_reader = ScalarColumnReader<true,
false>::create_unique(
row_ranges, total_rows, chunk, offset_index, ctz,
io_ctx);
- RETURN_IF_ERROR(scalar_reader->init(file, field,
max_buf_size));
+ RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size,
state));
scalar_reader->_filter_column_ids = filter_column_ids;
reader.reset(scalar_reader.release());
} else {
auto scalar_reader = ScalarColumnReader<true,
true>::create_unique(
row_ranges, total_rows, chunk, offset_index, ctz,
io_ctx);
- RETURN_IF_ERROR(scalar_reader->init(file, field,
max_buf_size));
+ RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size,
state));
scalar_reader->_filter_column_ids = filter_column_ids;
reader.reset(scalar_reader.release());
}
@@ -221,14 +222,14 @@ Status ParquetColumnReader::create(io::FileReaderSPtr
file, FieldSchema* field,
auto scalar_reader = ScalarColumnReader<false,
false>::create_unique(
row_ranges, total_rows, chunk, offset_index, ctz,
io_ctx);
- RETURN_IF_ERROR(scalar_reader->init(file, field,
max_buf_size));
+ RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size,
state));
scalar_reader->_filter_column_ids = filter_column_ids;
reader.reset(scalar_reader.release());
} else {
auto scalar_reader = ScalarColumnReader<false,
true>::create_unique(
row_ranges, total_rows, chunk, offset_index, ctz,
io_ctx);
- RETURN_IF_ERROR(scalar_reader->init(file, field,
max_buf_size));
+ RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size,
state));
scalar_reader->_filter_column_ids = filter_column_ids;
reader.reset(scalar_reader.release());
}
@@ -246,7 +247,8 @@ void ParquetColumnReader::_generate_read_ranges(RowRange
page_row_range,
template <bool IN_COLLECTION, bool OFFSET_INDEX>
Status ScalarColumnReader<IN_COLLECTION,
OFFSET_INDEX>::init(io::FileReaderSPtr file,
FieldSchema*
field,
- size_t
max_buf_size) {
+ size_t
max_buf_size,
+ RuntimeState*
state) {
_field_schema = field;
auto& chunk_meta = _chunk_meta.meta_data;
int64_t chunk_start = has_dict_page(chunk_meta) ?
chunk_meta.dictionary_page_offset
@@ -262,8 +264,11 @@ Status ScalarColumnReader<IN_COLLECTION,
OFFSET_INDEX>::init(io::FileReaderSPtr
}
_stream_reader = std::make_unique<io::BufferedFileStreamReader>(file,
chunk_start, chunk_len,
prefetch_buffer_size);
+ ParquetPageReadContext ctx(
+ (state == nullptr) ? true :
state->query_options().enable_parquet_file_page_cache);
+
_chunk_reader = std::make_unique<ColumnChunkReader<IN_COLLECTION,
OFFSET_INDEX>>(
- _stream_reader.get(), &_chunk_meta, field, _offset_index,
_total_rows, _io_ctx);
+ _stream_reader.get(), &_chunk_meta, field, _offset_index,
_total_rows, _io_ctx, ctx);
RETURN_IF_ERROR(_chunk_reader->init());
return Status::OK();
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index 2a21ddd84cb..97ad9d1fd3a 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -65,7 +65,15 @@ public:
decode_null_map_time(0),
skip_page_header_num(0),
parse_page_header_num(0),
- read_page_header_time(0) {}
+ read_page_header_time(0),
+ page_read_counter(0),
+ page_cache_write_counter(0),
+ page_cache_compressed_write_counter(0),
+ page_cache_decompressed_write_counter(0),
+ page_cache_hit_counter(0),
+ page_cache_missing_counter(0),
+ page_cache_compressed_hit_counter(0),
+ page_cache_decompressed_hit_counter(0) {}
ColumnStatistics(ColumnChunkReaderStatistics& cs, int64_t
null_map_time)
: page_index_read_calls(0),
@@ -78,7 +86,15 @@ public:
decode_null_map_time(null_map_time),
skip_page_header_num(cs.skip_page_header_num),
parse_page_header_num(cs.parse_page_header_num),
- read_page_header_time(cs.read_page_header_time) {}
+ read_page_header_time(cs.read_page_header_time),
+ page_read_counter(cs.page_read_counter),
+ page_cache_write_counter(cs.page_cache_write_counter),
+
page_cache_compressed_write_counter(cs.page_cache_compressed_write_counter),
+
page_cache_decompressed_write_counter(cs.page_cache_decompressed_write_counter),
+ page_cache_hit_counter(cs.page_cache_hit_counter),
+ page_cache_missing_counter(cs.page_cache_missing_counter),
+
page_cache_compressed_hit_counter(cs.page_cache_compressed_hit_counter),
+
page_cache_decompressed_hit_counter(cs.page_cache_decompressed_hit_counter) {}
int64_t page_index_read_calls;
int64_t decompress_time;
@@ -91,6 +107,14 @@ public:
int64_t skip_page_header_num;
int64_t parse_page_header_num;
int64_t read_page_header_time;
+ int64_t page_read_counter;
+ int64_t page_cache_write_counter;
+ int64_t page_cache_compressed_write_counter;
+ int64_t page_cache_decompressed_write_counter;
+ int64_t page_cache_hit_counter;
+ int64_t page_cache_missing_counter;
+ int64_t page_cache_compressed_hit_counter;
+ int64_t page_cache_decompressed_hit_counter;
void merge(ColumnStatistics& col_statistics) {
page_index_read_calls += col_statistics.page_index_read_calls;
@@ -104,6 +128,17 @@ public:
skip_page_header_num += col_statistics.skip_page_header_num;
parse_page_header_num += col_statistics.parse_page_header_num;
read_page_header_time += col_statistics.read_page_header_time;
+ page_read_counter += col_statistics.page_read_counter;
+ page_cache_write_counter +=
col_statistics.page_cache_write_counter;
+ page_cache_compressed_write_counter +=
+ col_statistics.page_cache_compressed_write_counter;
+ page_cache_decompressed_write_counter +=
+ col_statistics.page_cache_decompressed_write_counter;
+ page_cache_hit_counter += col_statistics.page_cache_hit_counter;
+ page_cache_missing_counter +=
col_statistics.page_cache_missing_counter;
+ page_cache_compressed_hit_counter +=
col_statistics.page_cache_compressed_hit_counter;
+ page_cache_decompressed_hit_counter +=
+ col_statistics.page_cache_decompressed_hit_counter;
}
};
@@ -132,7 +167,8 @@ public:
cctz::time_zone* ctz, io::IOContext* io_ctx,
std::unique_ptr<ParquetColumnReader>& reader, size_t
max_buf_size,
std::unordered_map<int, tparquet::OffsetIndex>&
col_offsets,
- bool in_collection = false, const std::set<uint64_t>&
column_ids = {},
+ RuntimeState* state, bool in_collection = false,
+ const std::set<uint64_t>& column_ids = {},
const std::set<uint64_t>& filter_column_ids = {});
virtual const std::vector<level_t>& get_rep_level() const = 0;
virtual const std::vector<level_t>& get_def_level() const = 0;
@@ -175,7 +211,8 @@ public:
_chunk_meta(chunk_meta),
_offset_index(offset_index) {}
~ScalarColumnReader() override { close(); }
- Status init(io::FileReaderSPtr file, FieldSchema* field, size_t
max_buf_size);
+ Status init(io::FileReaderSPtr file, FieldSchema* field, size_t
max_buf_size,
+ RuntimeState* state);
Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
const
std::shared_ptr<TableSchemaChangeHelper::Node>& root_node,
FilterMap& filter_map, size_t batch_size, size_t*
read_rows, bool* eof,
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 543c8c44bd4..12ac55016ca 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -133,7 +133,7 @@ Status RowGroupReader::init(
std::unique_ptr<ParquetColumnReader> reader;
RETURN_IF_ERROR(ParquetColumnReader::create(
_file_reader, field, _row_group_meta, _read_ranges, _ctz,
_io_ctx, reader,
- max_buf_size, col_offsets, false, _column_ids,
_filter_column_ids));
+ max_buf_size, col_offsets, _state, false, _column_ids,
_filter_column_ids));
if (reader == nullptr) {
VLOG_DEBUG << "Init row group(" << _row_group_id << ") reader
failed";
return Status::Corruption("Init row group reader failed");
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
index 3b6d7fdcb9b..70fdccd04a6 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
@@ -17,6 +17,7 @@
#include "vparquet_page_reader.h"
+#include <fmt/format.h>
#include <gen_cpp/parquet_types.h>
#include <stddef.h>
#include <stdint.h>
@@ -26,6 +27,8 @@
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "io/fs/buffered_reader.h"
+#include "olap/page_cache.h"
+#include "parquet_common.h"
#include "util/runtime_profile.h"
#include "util/slice.h"
#include "util/thrift_util.h"
@@ -40,10 +43,16 @@ namespace doris::vectorized {
#include "common/compile_check_begin.h"
static constexpr size_t INIT_PAGE_HEADER_SIZE = 128;
+void ParquetPageCacheKeyBuilder::init(const std::string& path, int64_t mtime) {
+ _file_key_prefix = fmt::format("{}::{}", path, mtime);
+}
+
template <bool IN_COLLECTION, bool OFFSET_INDEX>
PageReader<IN_COLLECTION, OFFSET_INDEX>::PageReader(io::BufferedStreamReader*
reader,
io::IOContext* io_ctx,
uint64_t offset,
uint64_t length, size_t
total_rows,
+ const
tparquet::ColumnMetaData& metadata,
+ const
ParquetPageReadContext& page_read_ctx,
const
tparquet::OffsetIndex* offset_index)
: _reader(reader),
_io_ctx(io_ctx),
@@ -51,9 +60,12 @@ PageReader<IN_COLLECTION,
OFFSET_INDEX>::PageReader(io::BufferedStreamReader* re
_start_offset(offset),
_end_offset(offset + length),
_total_rows(total_rows),
+ _metadata(metadata),
+ _page_read_ctx(page_read_ctx),
_offset_index(offset_index) {
_next_header_offset = _offset;
_state = INITIALIZED;
+ _page_cache_key_builder.init(_reader->path(), _reader->mtime());
if constexpr (OFFSET_INDEX) {
_end_row = _offset_index->page_locations.size() >= 2
@@ -77,11 +89,72 @@ Status PageReader<IN_COLLECTION,
OFFSET_INDEX>::parse_page_header() {
return Status::IOError("Should skip or load current page to get next
page");
}
+ _page_statistics.page_read_counter += 1;
+
+ // Parse page header from file; header bytes are saved for possible cache
insertion
const uint8_t* page_header_buf = nullptr;
size_t max_size = _end_offset - _offset;
size_t header_size = std::min(INIT_PAGE_HEADER_SIZE, max_size);
const size_t MAX_PAGE_HEADER_SIZE = config::parquet_header_max_size_mb <<
20;
uint32_t real_header_size = 0;
+
+ // Try a header-only lookup in the page cache. Cached pages store
+ // header + optional v2 levels + uncompressed payload, so we can
+ // parse the page header directly from the cached bytes and avoid
+ // a file read for the header.
+ if (_page_read_ctx.enable_parquet_file_page_cache &&
!config::disable_storage_page_cache &&
+ StoragePageCache::instance() != nullptr) {
+ PageCacheHandle handle;
+ StoragePageCache::CacheKey key =
make_page_cache_key(static_cast<int64_t>(_offset));
+ if (StoragePageCache::instance()->lookup(key, &handle,
segment_v2::DATA_PAGE)) {
+ // Parse header directly from cached data
+ _page_cache_handle = std::move(handle);
+ Slice s = _page_cache_handle.data();
+ real_header_size = cast_set<uint32_t>(s.size);
+ SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
+ auto st = deserialize_thrift_msg(reinterpret_cast<const
uint8_t*>(s.data),
+ &real_header_size, true,
&_cur_page_header);
+ if (!st.ok()) return st;
+ // Increment page cache counters for a true cache hit on
header+payload
+ _page_statistics.page_cache_hit_counter += 1;
+ // Detect whether the cached payload is compressed or decompressed
and record
+ bool is_cache_payload_decompressed =
+ should_cache_decompressed(&_cur_page_header, _metadata);
+
+ if (is_cache_payload_decompressed) {
+ _page_statistics.page_cache_decompressed_hit_counter += 1;
+ } else {
+ _page_statistics.page_cache_compressed_hit_counter += 1;
+ }
+
+ _is_cache_payload_decompressed = is_cache_payload_decompressed;
+
+ if constexpr (OFFSET_INDEX == false) {
+ if (is_header_v2()) {
+ _end_row = _start_row +
_cur_page_header.data_page_header_v2.num_rows;
+ } else if constexpr (!IN_COLLECTION) {
+ _end_row = _start_row +
_cur_page_header.data_page_header.num_values;
+ }
+ }
+
+ // Save header bytes for later use (e.g., to insert updated cache
entries)
+ _header_buf.assign(s.data, s.data + real_header_size);
+ _last_header_size = real_header_size;
+ _page_statistics.parse_page_header_num++;
+ _offset += real_header_size;
+ _next_header_offset = _offset +
_cur_page_header.compressed_page_size;
+ _state = HEADER_PARSED;
+ return Status::OK();
+ } else {
+ _page_statistics.page_cache_missing_counter += 1;
+ // Clear any existing cache handle on miss to avoid holding stale
handle
+ _page_cache_handle = PageCacheHandle();
+ }
+ }
+ // NOTE: page cache lookup for *decompressed* page data is handled in
+ // ColumnChunkReader::load_page_data(). PageReader should only be
+ // responsible for parsing the header bytes from the file and saving
+ // them in `_header_buf` for possible later insertion into the cache.
while (true) {
if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
return Status::EndOfFile("stop");
@@ -115,6 +188,9 @@ Status PageReader<IN_COLLECTION,
OFFSET_INDEX>::parse_page_header() {
}
}
+ // Save header bytes for possible cache insertion later
+ _header_buf.assign(page_header_buf, page_header_buf + real_header_size);
+ _last_header_size = real_header_size;
_page_statistics.parse_page_header_num++;
_offset += real_header_size;
_next_header_offset = _offset + _cur_page_header.compressed_page_size;
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.h
b/be/src/vec/exec/format/parquet/vparquet_page_reader.h
index 9246819d59c..9aa5ba3171c 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.h
@@ -21,9 +21,12 @@
#include <stdint.h>
#include <memory>
+#include <string>
#include "common/cast_set.h"
+#include "common/config.h"
#include "common/status.h"
+#include "olap/page_cache.h"
#include "util/block_compression.h"
#include "vec/exec/format/parquet/parquet_common.h"
namespace doris {
@@ -50,6 +53,35 @@ namespace doris::vectorized {
* Use to deserialize parquet page header, and get the page data in iterator
interface.
*/
+// Session-level options for parquet page reading/caching.
+struct ParquetPageReadContext {
+ bool enable_parquet_file_page_cache = true;
+ ParquetPageReadContext() = default;
+ ParquetPageReadContext(bool enable_parquet_file_page_cache)
+ : enable_parquet_file_page_cache(enable_parquet_file_page_cache) {}
+};
+
+inline bool should_cache_decompressed(const tparquet::PageHeader* header,
+ const tparquet::ColumnMetaData&
metadata) {
+ if (header->compressed_page_size <= 0) return true;
+ if (metadata.codec == tparquet::CompressionCodec::UNCOMPRESSED) return
true;
+
+ double ratio = static_cast<double>(header->uncompressed_page_size) /
+ static_cast<double>(header->compressed_page_size);
+ return ratio <= config::parquet_page_cache_decompress_threshold;
+}
+
+class ParquetPageCacheKeyBuilder {
+public:
+ void init(const std::string& path, int64_t mtime);
+ StoragePageCache::CacheKey make_key(uint64_t end_offset, int64_t offset)
const {
+ return StoragePageCache::CacheKey(_file_key_prefix, end_offset,
offset);
+ }
+
+private:
+ std::string _file_key_prefix;
+};
+
template <bool IN_COLLECTION, bool OFFSET_INDEX>
class PageReader {
public:
@@ -58,10 +90,19 @@ public:
int64_t skip_page_header_num = 0;
int64_t parse_page_header_num = 0;
int64_t read_page_header_time = 0;
+ int64_t page_cache_hit_counter = 0;
+ int64_t page_cache_missing_counter = 0;
+ int64_t page_cache_compressed_hit_counter = 0;
+ int64_t page_cache_decompressed_hit_counter = 0;
+ int64_t page_cache_write_counter = 0;
+ int64_t page_cache_compressed_write_counter = 0;
+ int64_t page_cache_decompressed_write_counter = 0;
+ int64_t page_read_counter = 0;
};
PageReader(io::BufferedStreamReader* reader, io::IOContext* io_ctx,
uint64_t offset,
- uint64_t length, size_t total_rows,
+ uint64_t length, size_t total_rows, const
tparquet::ColumnMetaData& metadata,
+ const ParquetPageReadContext& page_read_ctx,
const tparquet::OffsetIndex* offset_index = nullptr);
~PageReader() = default;
@@ -123,24 +164,53 @@ public:
}
}
- Status get_page_header(const tparquet::PageHeader*& page_header) {
+ Status get_page_header(const tparquet::PageHeader** page_header) {
if (UNLIKELY(_state != HEADER_PARSED)) {
return Status::InternalError("Page header not parsed");
}
- page_header = &_cur_page_header;
+ *page_header = &_cur_page_header;
return Status::OK();
}
Status get_page_data(Slice& slice);
+ // Skip page data and update offset (used when data is loaded from cache)
+ void skip_page_data() {
+ if (_state == HEADER_PARSED) {
+ _offset += _cur_page_header.compressed_page_size;
+ _state = DATA_LOADED;
+ }
+ }
+
+ const std::vector<uint8_t>& header_bytes() const { return _header_buf; }
+ // header start offset for current page
+ int64_t header_start_offset() const {
+ return static_cast<int64_t>(_next_header_offset) -
static_cast<int64_t>(_last_header_size) -
+ static_cast<int64_t>(_cur_page_header.compressed_page_size);
+ }
+ uint64_t file_end_offset() const { return _end_offset; }
+ bool cached_decompressed() const {
+ return should_cache_decompressed(&_cur_page_header, _metadata);
+ }
+
PageStatistics& page_statistics() { return _page_statistics; }
bool is_header_v2() { return _cur_page_header.__isset.data_page_header_v2;
}
+ // Returns whether the current page's cache payload is decompressed
+ bool is_cache_payload_decompressed() const { return
_is_cache_payload_decompressed; }
+
size_t start_row() const { return _start_row; }
size_t end_row() const { return _end_row; }
+ // Accessors for cache handle
+ bool has_page_cache_handle() const { return _page_cache_handle.cache() !=
nullptr; }
+ const doris::PageCacheHandle& page_cache_handle() const { return
_page_cache_handle; }
+ StoragePageCache::CacheKey make_page_cache_key(int64_t offset) const {
+ return _page_cache_key_builder.make_key(_end_offset, offset);
+ }
+
private:
enum PageReaderState { INITIALIZED, HEADER_PARSED, DATA_LOADED };
PageReaderState _state = INITIALIZED;
@@ -159,19 +229,33 @@ private:
size_t _end_row = 0;
// total rows in this column chunk
size_t _total_rows = 0;
+ // Column metadata for this column chunk
+ const tparquet::ColumnMetaData& _metadata;
+ // Session-level parquet page cache options
+ ParquetPageReadContext _page_read_ctx;
// for page index
size_t _page_index = 0;
const tparquet::OffsetIndex* _offset_index;
tparquet::PageHeader _cur_page_header;
+ bool _is_cache_payload_decompressed = true;
+
+ // Page cache members
+ ParquetPageCacheKeyBuilder _page_cache_key_builder;
+ doris::PageCacheHandle _page_cache_handle;
+ // stored header bytes when cache miss so we can insert header+payload
into cache
+ std::vector<uint8_t> _header_buf;
+ // last parsed header size in bytes
+ uint32_t _last_header_size = 0;
};
template <bool IN_COLLECTION, bool OFFSET_INDEX>
std::unique_ptr<PageReader<IN_COLLECTION, OFFSET_INDEX>> create_page_reader(
io::BufferedStreamReader* reader, io::IOContext* io_ctx, uint64_t
offset, uint64_t length,
- size_t total_rows, const tparquet::OffsetIndex* offset_index =
nullptr) {
- return std::make_unique<PageReader<IN_COLLECTION, OFFSET_INDEX>>(reader,
io_ctx, offset, length,
-
total_rows, offset_index);
+ size_t total_rows, const tparquet::ColumnMetaData& metadata,
+ const ParquetPageReadContext& ctx, const tparquet::OffsetIndex*
offset_index = nullptr) {
+ return std::make_unique<PageReader<IN_COLLECTION, OFFSET_INDEX>>(
+ reader, io_ctx, offset, length, total_rows, metadata, ctx,
offset_index);
}
#include "common/compile_check_end.h"
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index a84ee2b9d2c..de99ad2170c 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -194,6 +194,22 @@ void ParquetReader::_init_profile() {
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecompressTime",
parquet_profile, 1);
_parquet_profile.decompress_cnt = ADD_CHILD_COUNTER_WITH_LEVEL(
_profile, "DecompressCount", TUnit::UNIT, parquet_profile, 1);
+ _parquet_profile.page_read_counter = ADD_CHILD_COUNTER_WITH_LEVEL(
+ _profile, "PageReadCount", TUnit::UNIT, parquet_profile, 1);
+ _parquet_profile.page_cache_write_counter =
ADD_CHILD_COUNTER_WITH_LEVEL(
+ _profile, "PageCacheWriteCount", TUnit::UNIT, parquet_profile,
1);
+ _parquet_profile.page_cache_compressed_write_counter =
ADD_CHILD_COUNTER_WITH_LEVEL(
+ _profile, "PageCacheCompressedWriteCount", TUnit::UNIT,
parquet_profile, 1);
+ _parquet_profile.page_cache_decompressed_write_counter =
ADD_CHILD_COUNTER_WITH_LEVEL(
+ _profile, "PageCacheDecompressedWriteCount", TUnit::UNIT,
parquet_profile, 1);
+ _parquet_profile.page_cache_hit_counter = ADD_CHILD_COUNTER_WITH_LEVEL(
+ _profile, "PageCacheHitCount", TUnit::UNIT, parquet_profile,
1);
+ _parquet_profile.page_cache_missing_counter =
ADD_CHILD_COUNTER_WITH_LEVEL(
+ _profile, "PageCacheMissingCount", TUnit::UNIT,
parquet_profile, 1);
+ _parquet_profile.page_cache_compressed_hit_counter =
ADD_CHILD_COUNTER_WITH_LEVEL(
+ _profile, "PageCacheCompressedHitCount", TUnit::UNIT,
parquet_profile, 1);
+ _parquet_profile.page_cache_decompressed_hit_counter =
ADD_CHILD_COUNTER_WITH_LEVEL(
+ _profile, "PageCacheDecompressedHitCount", TUnit::UNIT,
parquet_profile, 1);
_parquet_profile.decode_header_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PageHeaderDecodeTime",
parquet_profile, 1);
_parquet_profile.read_page_header_time =
@@ -1271,6 +1287,21 @@ void ParquetReader::_collect_profile() {
_column_statistics.page_index_read_calls);
COUNTER_UPDATE(_parquet_profile.decompress_time,
_column_statistics.decompress_time);
COUNTER_UPDATE(_parquet_profile.decompress_cnt,
_column_statistics.decompress_cnt);
+ COUNTER_UPDATE(_parquet_profile.page_read_counter,
_column_statistics.page_read_counter);
+ COUNTER_UPDATE(_parquet_profile.page_cache_write_counter,
+ _column_statistics.page_cache_write_counter);
+ COUNTER_UPDATE(_parquet_profile.page_cache_compressed_write_counter,
+ _column_statistics.page_cache_compressed_write_counter);
+ COUNTER_UPDATE(_parquet_profile.page_cache_decompressed_write_counter,
+ _column_statistics.page_cache_decompressed_write_counter);
+ COUNTER_UPDATE(_parquet_profile.page_cache_hit_counter,
+ _column_statistics.page_cache_hit_counter);
+ COUNTER_UPDATE(_parquet_profile.page_cache_missing_counter,
+ _column_statistics.page_cache_missing_counter);
+ COUNTER_UPDATE(_parquet_profile.page_cache_compressed_hit_counter,
+ _column_statistics.page_cache_compressed_hit_counter);
+ COUNTER_UPDATE(_parquet_profile.page_cache_decompressed_hit_counter,
+ _column_statistics.page_cache_decompressed_hit_counter);
COUNTER_UPDATE(_parquet_profile.decode_header_time,
_column_statistics.decode_header_time);
COUNTER_UPDATE(_parquet_profile.read_page_header_time,
_column_statistics.read_page_header_time);
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index b6451ad15d4..dcd9e99f393 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -195,6 +195,14 @@ private:
RuntimeProfile::Counter* file_footer_hit_cache = nullptr;
RuntimeProfile::Counter* decompress_time = nullptr;
RuntimeProfile::Counter* decompress_cnt = nullptr;
+ RuntimeProfile::Counter* page_read_counter = nullptr;
+ RuntimeProfile::Counter* page_cache_write_counter = nullptr;
+ RuntimeProfile::Counter* page_cache_compressed_write_counter = nullptr;
+ RuntimeProfile::Counter* page_cache_decompressed_write_counter =
nullptr;
+ RuntimeProfile::Counter* page_cache_hit_counter = nullptr;
+ RuntimeProfile::Counter* page_cache_missing_counter = nullptr;
+ RuntimeProfile::Counter* page_cache_compressed_hit_counter = nullptr;
+ RuntimeProfile::Counter* page_cache_decompressed_hit_counter = nullptr;
RuntimeProfile::Counter* decode_header_time = nullptr;
RuntimeProfile::Counter* read_page_header_time = nullptr;
RuntimeProfile::Counter* decode_value_time = nullptr;
diff --git a/be/test/io/fs/buffered_reader_test.cpp
b/be/test/io/fs/buffered_reader_test.cpp
index 3874b06c68c..bc92d22b178 100644
--- a/be/test/io/fs/buffered_reader_test.cpp
+++ b/be/test/io/fs/buffered_reader_test.cpp
@@ -68,6 +68,8 @@ public:
bool closed() const override { return _reader->closed(); }
+ int64_t mtime() const override { return _reader->mtime(); }
+
private:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const io::IOContext* io_ctx) override {
@@ -96,6 +98,8 @@ public:
bool closed() const override { return _closed; }
+ int64_t mtime() const override { return 0; }
+
protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const io::IOContext* io_ctx) override {
@@ -130,6 +134,8 @@ public:
bool closed() const override { return _delegate->closed(); }
+ int64_t mtime() const override { return _delegate->mtime(); }
+
const io::PrefetchRange& last_read_range() const { return
*_last_read_range; }
protected:
diff --git a/be/test/io/fs/packed_file_concurrency_test.cpp
b/be/test/io/fs/packed_file_concurrency_test.cpp
index 31c2db19fb8..1e581ff59b8 100644
--- a/be/test/io/fs/packed_file_concurrency_test.cpp
+++ b/be/test/io/fs/packed_file_concurrency_test.cpp
@@ -411,6 +411,8 @@ public:
bool closed() const override { return _closed; }
+ int64_t mtime() const override { return 0; }
+
protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* /*io_ctx*/) override {
diff --git a/be/test/io/fs/packed_file_reader_test.cpp
b/be/test/io/fs/packed_file_reader_test.cpp
index feaf5c37f3d..2b0472c38a5 100644
--- a/be/test/io/fs/packed_file_reader_test.cpp
+++ b/be/test/io/fs/packed_file_reader_test.cpp
@@ -69,6 +69,8 @@ protected:
return Status::OK();
}
+ int64_t mtime() const override { return 0; }
+
private:
Path _path = Path("mock_file");
std::string _content;
diff --git a/be/test/io/fs/packed_file_system_test.cpp
b/be/test/io/fs/packed_file_system_test.cpp
index 7f8a4af63df..096bb8c4a93 100644
--- a/be/test/io/fs/packed_file_system_test.cpp
+++ b/be/test/io/fs/packed_file_system_test.cpp
@@ -53,6 +53,8 @@ public:
bool closed() const override { return _closed; }
+ int64_t mtime() const override { return 0; }
+
protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* /*io_ctx*/) override {
diff --git a/be/test/vec/exec/format/file_reader/file_meta_cache_test.cpp
b/be/test/vec/exec/format/file_reader/file_meta_cache_test.cpp
index 3aef8db8459..5ea6335d2b9 100644
--- a/be/test/vec/exec/format/file_reader/file_meta_cache_test.cpp
+++ b/be/test/vec/exec/format/file_reader/file_meta_cache_test.cpp
@@ -38,6 +38,8 @@ public:
bool closed() const override { return _closed; }
+ int64_t mtime() const override { return 0; }
+
Status close() override {
_closed = true;
return Status::OK();
diff --git a/be/test/vec/exec/format/parquet/parquet_page_cache_test.cpp
b/be/test/vec/exec/format/parquet/parquet_page_cache_test.cpp
new file mode 100644
index 00000000000..c415d264e8e
--- /dev/null
+++ b/be/test/vec/exec/format/parquet/parquet_page_cache_test.cpp
@@ -0,0 +1,804 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <fmt/format.h>
+#include <gtest/gtest.h>
+
+#include "common/config.h"
+#include "io/fs/buffered_reader.h"
+#include "olap/page_cache.h"
+#include "runtime/exec_env.h"
+#include "runtime/memory/cache_manager.h"
+#include "util/block_compression.h"
+#include "util/faststring.h"
+#include "util/thrift_util.h"
+#include "vec/exec/format/parquet/schema_desc.h"
+#include "vec/exec/format/parquet/vparquet_column_chunk_reader.h"
+#include "vec/exec/format/parquet/vparquet_page_reader.h"
+
+using namespace doris;
+using namespace doris::vectorized;
+
+class FakeBufferedReader : public io::BufferedStreamReader {
+public:
+ FakeBufferedReader(std::string path, std::vector<uint8_t> data)
+ : _path(std::move(path)), _data(std::move(data)) {}
+ Status read_bytes(const uint8_t** buf, uint64_t offset, const size_t
bytes_to_read,
+ const doris::io::IOContext* io_ctx) override {
+ if (offset + bytes_to_read > _data.size()) return Status::IOError("Out
of bounds");
+ *buf = _data.data() + offset;
+ return Status::OK();
+ }
+ Status read_bytes(Slice& slice, uint64_t offset, const
doris::io::IOContext* io_ctx) override {
+ if (offset + slice.size > _data.size()) return Status::IOError("Out of
bounds");
+ slice.data = reinterpret_cast<char*>(_data.data() + offset);
+ return Status::OK();
+ }
+ std::string path() override { return _path; }
+
+ int64_t mtime() const override { return 0; }
+
+private:
+ std::string _path;
+ std::vector<uint8_t> _data;
+};
+
+TEST(ParquetPageCacheTest, CacheHitReturnsDecompressedPayload) {
+ ParquetPageReadContext ctx;
+ ctx.enable_parquet_file_page_cache = true;
+
+ // construct thrift PageHeader (uncompressed payload) and payload
+ tparquet::PageHeader header;
+ header.type = tparquet::PageType::DATA_PAGE;
+ header.__set_compressed_page_size(4);
+ header.__set_uncompressed_page_size(4);
+ header.__isset.data_page_header = true;
+ header.data_page_header.__set_num_values(1);
+
+ std::vector<uint8_t> header_bytes;
+ ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+ ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok());
+
+ std::vector<uint8_t> payload = {0x11, 0x22, 0x33, 0x44};
+ std::vector<uint8_t> cached_data;
+ cached_data.insert(cached_data.end(), header_bytes.begin(),
header_bytes.end());
+ cached_data.insert(cached_data.end(), payload.begin(), payload.end());
+
+ std::string path = "test_parquet_cache_file";
+ int64_t header_offset = 128;
+ // make file_end_offset consistent with reader/page reader end offset used
in test
+ int64_t file_end_offset = header_offset +
static_cast<int64_t>(cached_data.size());
+
+ // insert into cache
+ int64_t mtime = 0;
+ StoragePageCache::CacheKey key(fmt::format("{}::{}", path, mtime),
+ static_cast<size_t>(file_end_offset),
header_offset);
+ size_t total = cached_data.size();
+ auto* page = new DataPage(total, true, segment_v2::DATA_PAGE);
+ memcpy(page->data(), cached_data.data(), total);
+ page->reset_size(total);
+ PageCacheHandle handle;
+ StoragePageCache::instance()->insert(key, page, &handle,
segment_v2::DATA_PAGE);
+
+ // create fake reader and a ColumnChunkReader to verify cache hit
+ // ensure the reader contains the same header+payload at the header offset
so header parsing succeeds
+ std::vector<uint8_t> backing(256, 0);
+ memcpy(backing.data() + header_offset, cached_data.data(), total);
+ FakeBufferedReader reader(path, backing);
+ // prepare column chunk metadata so ColumnChunkReader uses same offsets
+ tparquet::ColumnChunk cc;
+ cc.meta_data.__set_data_page_offset(header_offset);
+ cc.meta_data.__set_total_compressed_size(total);
+ cc.meta_data.__set_num_values(1);
+ cc.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED);
+
+ FieldSchema field_schema;
+ field_schema.repetition_level = 0;
+ field_schema.definition_level = 0;
+
+ ColumnChunkReader<false, false> ccr(&reader, &cc, &field_schema, nullptr,
0, nullptr, ctx);
+ ASSERT_TRUE(ccr.init().ok());
+ // load_page_data should hit the cache and return decompressed payload
+ ASSERT_TRUE(ccr.load_page_data().ok());
+ Slice s = ccr.get_page_data();
+ ASSERT_EQ(s.size, payload.size());
+ ASSERT_EQ(0, memcmp(s.data, payload.data(), payload.size()));
+ // stats: ensure there was a page read and at least one hit recorded
+ auto& statistics = ccr.statistics();
+ EXPECT_EQ(statistics.page_read_counter, 1);
+ EXPECT_EQ(statistics.page_cache_hit_counter, 1);
+ EXPECT_EQ(statistics.page_cache_decompressed_hit_counter, 1);
+}
+
+TEST(ParquetPageCacheTest, DecompressedPageInsertedByColumnChunkReader) {
+ ParquetPageReadContext ctx;
+ ctx.enable_parquet_file_page_cache = true;
+ // ensure decompressed pages are cached via BE config
+ double old_thresh = config::parquet_page_cache_decompress_threshold;
+ bool old_enable_compressed = config::enable_parquet_cache_compressed_pages;
+ config::parquet_page_cache_decompress_threshold = 100.0;
+ config::enable_parquet_cache_compressed_pages = false;
+
+ // construct uncompressed header + payload in file buffer
+ tparquet::PageHeader header;
+ header.type = tparquet::PageType::DATA_PAGE;
+ header.__set_compressed_page_size(4);
+ header.__set_uncompressed_page_size(4);
+ header.__isset.data_page_header = true;
+ header.data_page_header.__set_num_values(1);
+
+ std::vector<uint8_t> header_bytes;
+ ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+ ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok());
+
+ std::vector<uint8_t> payload = {0x55, 0x66, 0x77, 0x88};
+ std::vector<uint8_t> file_data;
+ file_data.insert(file_data.end(), header_bytes.begin(),
header_bytes.end());
+ file_data.insert(file_data.end(), payload.begin(), payload.end());
+
+ std::string path = "test_parquet_insert_file";
+ int64_t header_offset = 0;
+
+ FakeBufferedReader reader(path, file_data);
+
+ // prepare column chunk metadata
+ tparquet::ColumnChunk cc;
+ cc.meta_data.__set_data_page_offset(header_offset);
+ cc.meta_data.__set_total_compressed_size(file_data.size());
+ cc.meta_data.__set_num_values(1);
+ cc.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED);
+
+ {
+ FieldSchema field_schema;
+ field_schema.repetition_level = 0;
+ field_schema.definition_level = 0;
+ ColumnChunkReader<false, false> ccr(&reader, &cc, &field_schema,
nullptr, 0, nullptr, ctx);
+ ASSERT_TRUE(ccr.init().ok());
+ ASSERT_TRUE(ccr.load_page_data().ok());
+
+ // Now cache should have an entry; verify by creating a fresh
ColumnChunkReader and hitting cache
+ ColumnChunkReader<false, false> ccr_check(&reader, &cc, &field_schema,
nullptr, 0, nullptr,
+ ctx);
+ ASSERT_TRUE(ccr_check.init().ok());
+ // ASSERT_TRUE(ccr_check.next_page().ok());
+ ASSERT_TRUE(ccr_check.load_page_data().ok());
+ Slice s = ccr_check.get_page_data();
+ ASSERT_EQ(s.size, payload.size());
+ EXPECT_EQ(0, memcmp(s.data, payload.data(), payload.size()));
+ EXPECT_EQ(ccr_check.statistics().page_cache_hit_counter, 1);
+ }
+ // restore config
+ config::parquet_page_cache_decompress_threshold = old_thresh;
+ config::enable_parquet_cache_compressed_pages = old_enable_compressed;
+}
+
+TEST(ParquetPageCacheTest, V2LevelsPreservedInCache) {
+ ParquetPageReadContext ctx;
+ ctx.enable_parquet_file_page_cache = true;
+ // ensure decompressed pages are cached via BE config
+ double old_thresh = config::parquet_page_cache_decompress_threshold;
+ bool old_enable_compressed = config::enable_parquet_cache_compressed_pages;
+ config::parquet_page_cache_decompress_threshold = 100.0;
+ config::enable_parquet_cache_compressed_pages = false;
+
+ // construct v2 header + levels + payload in file buffer (uncompressed)
+ tparquet::PageHeader header;
+ header.type = tparquet::PageType::DATA_PAGE_V2;
+ int rl = 2;
+ int dl = 1;
+ int payload_sz = 2;
+ header.__set_compressed_page_size(rl + dl + payload_sz);
+ header.__set_uncompressed_page_size(rl + dl + payload_sz);
+ header.__isset.data_page_header_v2 = true;
+ header.data_page_header_v2.__set_repetition_levels_byte_length(rl);
+ header.data_page_header_v2.__set_definition_levels_byte_length(dl);
+ header.data_page_header_v2.__set_is_compressed(false);
+ header.data_page_header_v2.__set_num_values(1);
+
+ std::vector<uint8_t> header_bytes;
+ ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+ ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok());
+
+ std::vector<uint8_t> level_bytes = {0x11, 0x22, 0x33};
+ std::vector<uint8_t> payload = {0xAA, 0xBB};
+ std::vector<uint8_t> file_data;
+ file_data.insert(file_data.end(), header_bytes.begin(),
header_bytes.end());
+ file_data.insert(file_data.end(), level_bytes.begin(), level_bytes.end());
+ file_data.insert(file_data.end(), payload.begin(), payload.end());
+
+ std::string path = "test_v2_levels_file";
+ FakeBufferedReader reader(path, file_data);
+
+ // prepare column chunk metadata
+ tparquet::ColumnChunk cc;
+ cc.meta_data.__set_data_page_offset(0);
+ cc.meta_data.__set_total_compressed_size(file_data.size());
+ cc.meta_data.__set_num_values(1);
+ cc.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED);
+
+ FieldSchema field_schema;
+ field_schema.repetition_level = 0;
+ field_schema.definition_level = 0;
+ {
+ ColumnChunkReader<false, false> ccr(&reader, &cc, &field_schema,
nullptr, 0, nullptr, ctx);
+ ASSERT_TRUE(ccr.init().ok());
+ ASSERT_TRUE(ccr.load_page_data().ok());
+
+ // Now cache should have entry; verify by creating a ColumnChunkReader
and hitting cache
+ ColumnChunkReader<false, false> ccr_check(&reader, &cc, &field_schema,
nullptr, 0, nullptr,
+ ctx);
+ ASSERT_TRUE(ccr_check.init().ok());
+ ASSERT_TRUE(ccr_check.load_page_data().ok());
+ Slice s = ccr_check.get_page_data();
+ ASSERT_EQ(s.size, payload.size());
+ EXPECT_EQ(0, memcmp(s.data, payload.data(), payload.size()));
+ }
+
+ // Verify that a fresh ColumnChunkReader reusing cache gets level bytes
preserved
+ FieldSchema field_schema2;
+ field_schema2.repetition_level = 2; // v2 levels present
+ field_schema2.definition_level = 1;
+ ColumnChunkReader<false, false> ccr2(&reader, &cc, &field_schema2,
nullptr, 0, nullptr, ctx);
+ ASSERT_TRUE(ccr2.init().ok());
+ ASSERT_TRUE(ccr2.load_page_data().ok());
+ // Level slices should equal the original level bytes
+ const Slice& rep = ccr2.v2_rep_levels();
+ const Slice& def = ccr2.v2_def_levels();
+ auto& statistics = ccr2.statistics();
+ EXPECT_GT(statistics.page_cache_hit_counter, 0);
+ // because threshold is set to cache decompressed, we should see
decompressed hits
+ EXPECT_GT(statistics.page_cache_decompressed_hit_counter, 0);
+ ASSERT_EQ(def.size, dl);
+ EXPECT_EQ(0, memcmp(rep.data, level_bytes.data(), rl));
+ EXPECT_EQ(0, memcmp(def.data, level_bytes.data() + rl, dl));
+ // restore config
+ config::parquet_page_cache_decompress_threshold = old_thresh;
+ config::enable_parquet_cache_compressed_pages = old_enable_compressed;
+}
+
+TEST(ParquetPageCacheTest, CompressedV1PageCachedAndHit) {
+ ParquetPageReadContext ctx;
+ ctx.enable_parquet_file_page_cache = true;
+
+ // construct compressed v1 header + compressed payload in file buffer
+ tparquet::PageHeader header;
+ header.type = tparquet::PageType::DATA_PAGE;
+ header.__isset.data_page_header = true;
+ header.data_page_header.__set_num_values(1);
+
+ std::vector<uint8_t> payload = {0x01, 0x02, 0x03, 0x04};
+
+ // compress payload using a block codec
+ BlockCompressionCodec* codec = nullptr;
+
ASSERT_TRUE(get_block_compression_codec(segment_v2::CompressionTypePB::SNAPPY,
&codec).ok());
+ faststring compressed_fast;
+ std::vector<Slice> inputs;
+ inputs.emplace_back(payload.data(), payload.size());
+ ASSERT_TRUE(codec->compress(inputs, payload.size(),
&compressed_fast).ok());
+
+
header.__set_compressed_page_size(static_cast<int32_t>(compressed_fast.size()));
+ header.__set_uncompressed_page_size(static_cast<int32_t>(payload.size()));
+
+ std::vector<uint8_t> header_bytes;
+ ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+ ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok());
+
+ std::vector<uint8_t> file_data;
+ file_data.insert(file_data.end(), header_bytes.begin(),
header_bytes.end());
+ file_data.insert(file_data.end(), compressed_fast.data(),
+ compressed_fast.data() + compressed_fast.size());
+
+ std::string path = "test_compressed_v1_file";
+ FakeBufferedReader reader(path, file_data);
+
+ tparquet::ColumnChunk cc;
+ cc.meta_data.__set_data_page_offset(0);
+ cc.meta_data.__set_total_compressed_size(file_data.size());
+ cc.meta_data.__set_num_values(1);
+ cc.meta_data.__set_codec(tparquet::CompressionCodec::SNAPPY);
+
+ FieldSchema field_schema;
+ field_schema.repetition_level = 0;
+ field_schema.definition_level = 0;
+
+ // Load page to trigger decompression + cache insert
+ ColumnChunkReader<false, false> ccr(&reader, &cc, &field_schema, nullptr,
0, nullptr, ctx);
+ ASSERT_TRUE(ccr.init().ok());
+ ASSERT_TRUE(ccr.load_page_data().ok());
+ EXPECT_EQ(ccr.statistics().page_cache_write_counter, 1);
+
+ // Now verify a fresh reader hits the cache and returns payload
+ ColumnChunkReader<false, false> ccr_check(&reader, &cc, &field_schema,
nullptr, 0, nullptr,
+ ctx);
+ ASSERT_TRUE(ccr_check.init().ok());
+ // ASSERT_TRUE(ccr_check.next_page().ok());
+ ASSERT_TRUE(ccr_check.load_page_data().ok());
+ Slice s = ccr_check.get_page_data();
+ ASSERT_EQ(s.size, payload.size());
+ EXPECT_EQ(0, memcmp(s.data, payload.data(), payload.size()));
+ EXPECT_EQ(ccr_check.statistics().page_cache_hit_counter, 1);
+}
+
+TEST(ParquetPageCacheTest, CompressedV2LevelsPreservedInCache) {
+ ParquetPageReadContext ctx;
+ ctx.enable_parquet_file_page_cache = true;
+
+ // construct v2 header + levels + compressed payload in file buffer
+ tparquet::PageHeader header;
+ header.type = tparquet::PageType::DATA_PAGE_V2;
+ int rl = 2;
+ int dl = 1;
+ //int payload_sz = 2;
+ header.__isset.data_page_header_v2 = true;
+ header.data_page_header_v2.__set_repetition_levels_byte_length(rl);
+ header.data_page_header_v2.__set_definition_levels_byte_length(dl);
+ header.data_page_header_v2.__set_is_compressed(true);
+ header.data_page_header_v2.__set_num_values(1);
+
+ std::vector<uint8_t> level_bytes = {0x11, 0x22, 0x33};
+ std::vector<uint8_t> payload = {0xAA, 0xBB};
+
+ // compress payload
+ BlockCompressionCodec* codec = nullptr;
+
ASSERT_TRUE(get_block_compression_codec(segment_v2::CompressionTypePB::SNAPPY,
&codec).ok());
+ faststring compressed_fast;
+ std::vector<Slice> inputs;
+ inputs.emplace_back(payload.data(), payload.size());
+ ASSERT_TRUE(codec->compress(inputs, payload.size(),
&compressed_fast).ok());
+
+ // compressed page: levels (uncompressed) followed by compressed payload
+ std::vector<uint8_t> compressed_page;
+ compressed_page.insert(compressed_page.end(), level_bytes.begin(),
level_bytes.end());
+ compressed_page.insert(compressed_page.end(), compressed_fast.data(),
+ compressed_fast.data() + compressed_fast.size());
+
+
header.__set_compressed_page_size(static_cast<int32_t>(compressed_page.size()));
+
header.__set_uncompressed_page_size(static_cast<int32_t>(level_bytes.size() +
payload.size()));
+
+ std::vector<uint8_t> header_bytes;
+ ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+ ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok());
+
+ std::vector<uint8_t> file_data;
+ file_data.insert(file_data.end(), header_bytes.begin(),
header_bytes.end());
+ file_data.insert(file_data.end(), compressed_page.begin(),
compressed_page.end());
+
+ std::string path = "test_compressed_v2_file";
+ FakeBufferedReader reader(path, file_data);
+
+ tparquet::ColumnChunk cc;
+ cc.meta_data.__set_data_page_offset(0);
+ cc.meta_data.__set_total_compressed_size(file_data.size());
+ cc.meta_data.__set_num_values(1);
+ cc.meta_data.__set_codec(tparquet::CompressionCodec::SNAPPY);
+
+ FieldSchema field_schema;
+ field_schema.repetition_level = 0;
+ field_schema.definition_level = 0;
+
+ // Load page to trigger decompression + cache insert
+ ColumnChunkReader<false, false> ccr(&reader, &cc, &field_schema, nullptr,
0, nullptr, ctx);
+ ASSERT_TRUE(ccr.init().ok());
+ ASSERT_TRUE(ccr.load_page_data().ok());
+ EXPECT_EQ(ccr.statistics().page_cache_write_counter, 1);
+
+ // Now verify a fresh reader hits the cache and v2 levels are preserved
+ FieldSchema field_schema2;
+ field_schema2.repetition_level = rl;
+ field_schema2.definition_level = dl;
+ ColumnChunkReader<false, false> ccr_check(&reader, &cc, &field_schema2,
nullptr, 0, nullptr,
+ ctx);
+ ASSERT_TRUE(ccr_check.init().ok());
+ ASSERT_TRUE(ccr_check.load_page_data().ok());
+ Slice s = ccr_check.get_page_data();
+ ASSERT_EQ(s.size, payload.size());
+ EXPECT_EQ(0, memcmp(s.data, payload.data(), payload.size()));
+ const Slice& rep = ccr_check.v2_rep_levels();
+ const Slice& def = ccr_check.v2_def_levels();
+ ASSERT_EQ(rep.size, rl);
+ ASSERT_EQ(def.size, dl);
+ // cached v2 page is stored decompressed (threshold=100), make sure
counter reflects it
+ EXPECT_GT(ccr_check.statistics().page_cache_decompressed_hit_counter, 0);
+ EXPECT_EQ(0, memcmp(rep.data, level_bytes.data(), rl));
+ EXPECT_EQ(0, memcmp(def.data, level_bytes.data() + rl, dl));
+}
+
+TEST(ParquetPageCacheTest, MultiPagesMixedV1V2CacheHit) {
+ ParquetPageReadContext ctx;
+ ctx.enable_parquet_file_page_cache = true;
+
+ // Prepare a v1 uncompressed page and a v2 uncompressed page and insert
both into cache
+ std::string path = "test_multi_pages_file";
+
+ // v1 page
+ tparquet::PageHeader hdr1;
+ hdr1.type = tparquet::PageType::DATA_PAGE;
+ hdr1.__set_compressed_page_size(4);
+ hdr1.__set_uncompressed_page_size(4);
+ hdr1.__isset.data_page_header = true;
+ hdr1.data_page_header.__set_num_values(1);
+ std::vector<uint8_t> header1_bytes;
+ ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+ ASSERT_TRUE(ts.serialize(&hdr1, &header1_bytes).ok());
+ std::vector<uint8_t> payload1 = {0x10, 0x20, 0x30, 0x40};
+ std::vector<uint8_t> cached1;
+ cached1.insert(cached1.end(), header1_bytes.begin(), header1_bytes.end());
+ cached1.insert(cached1.end(), payload1.begin(), payload1.end());
+
+ // v2 page
+ tparquet::PageHeader hdr2;
+ hdr2.type = tparquet::PageType::DATA_PAGE_V2;
+ int rl = 2;
+ int dl = 1;
+ int payload2_sz = 2;
+ hdr2.__set_compressed_page_size(rl + dl + payload2_sz);
+ hdr2.__set_uncompressed_page_size(rl + dl + payload2_sz);
+ hdr2.__isset.data_page_header_v2 = true;
+ hdr2.data_page_header_v2.__set_repetition_levels_byte_length(rl);
+ hdr2.data_page_header_v2.__set_definition_levels_byte_length(dl);
+ hdr2.data_page_header_v2.__set_is_compressed(false);
+ hdr2.data_page_header_v2.__set_num_values(1);
+ std::vector<uint8_t> header2_bytes;
+ ASSERT_TRUE(ts.serialize(&hdr2, &header2_bytes).ok());
+ std::vector<uint8_t> level_bytes = {0x11, 0x22, 0x33};
+ std::vector<uint8_t> payload2 = {0xAA, 0xBB};
+ std::vector<uint8_t> cached2;
+ cached2.insert(cached2.end(), header2_bytes.begin(), header2_bytes.end());
+ cached2.insert(cached2.end(), level_bytes.begin(), level_bytes.end());
+ cached2.insert(cached2.end(), payload2.begin(), payload2.end());
+
+ // Insert both pages into cache under different header offsets
+ size_t total1 = cached1.size();
+ auto* page1 = new DataPage(total1, true, segment_v2::DATA_PAGE);
+ memcpy(page1->data(), cached1.data(), total1);
+ page1->reset_size(total1);
+ PageCacheHandle h1;
+ size_t header1_start = 128;
+ int64_t mtime = 0;
+ StoragePageCache::CacheKey key1(fmt::format("{}::{}", path, mtime),
+ static_cast<size_t>(header1_start +
total1), header1_start);
+ StoragePageCache::instance()->insert(key1, page1, &h1,
segment_v2::DATA_PAGE);
+
+ size_t total2 = cached2.size();
+ auto* page2 = new DataPage(total2, true, segment_v2::DATA_PAGE);
+ memcpy(page2->data(), cached2.data(), total2);
+ page2->reset_size(total2);
+ PageCacheHandle h2;
+ size_t header2_start = 256;
+ StoragePageCache::CacheKey key2(fmt::format("{}::{}", path, mtime),
+ static_cast<size_t>(header2_start +
total2), header2_start);
+ StoragePageCache::instance()->insert(key2, page2, &h2,
segment_v2::DATA_PAGE);
+
+ // Now create readers that would lookup those cache keys
+ // Reader1 must expose header+page bytes at offset header1_start
+ std::vector<uint8_t> reader_backing1(3000, 0);
+ memcpy(reader_backing1.data() + header1_start, cached1.data(), total1);
+ FakeBufferedReader reader1(path, reader_backing1);
+ tparquet::ColumnChunk cc1;
+ cc1.meta_data.__set_data_page_offset(128);
+ cc1.meta_data.__set_total_compressed_size(total1);
+ cc1.meta_data.__set_num_values(1);
+ cc1.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED);
+ FieldSchema field_schema1;
+ field_schema1.repetition_level = 0;
+ field_schema1.definition_level = 0;
+ ColumnChunkReader<false, false> ccr1(&reader1, &cc1, &field_schema1,
nullptr, 0, nullptr, ctx);
+ ASSERT_TRUE(ccr1.init().ok());
+ ASSERT_TRUE(ccr1.load_page_data().ok());
+ Slice s1 = ccr1.get_page_data();
+ ASSERT_EQ(s1.size, payload1.size());
+ EXPECT_EQ(0, memcmp(s1.data, payload1.data(), payload1.size()));
+
+ std::vector<uint8_t> reader_backing2(3000, 0);
+ memcpy(reader_backing2.data() + header2_start, cached2.data(), total2);
+ FakeBufferedReader reader2(path, reader_backing2);
+ tparquet::ColumnChunk cc2;
+ cc2.meta_data.__set_data_page_offset(256);
+ cc2.meta_data.__set_total_compressed_size(total2);
+ cc2.meta_data.__set_num_values(1);
+ cc2.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED);
+ FieldSchema field_schema2;
+ field_schema2.repetition_level = rl;
+ field_schema2.definition_level = dl;
+ ColumnChunkReader<false, false> ccr2(&reader2, &cc2, &field_schema2,
nullptr, 0, nullptr, ctx);
+ ASSERT_TRUE(ccr2.init().ok());
+ ASSERT_TRUE(ccr2.load_page_data().ok());
+ Slice s2 = ccr2.get_page_data();
+ ASSERT_EQ(s2.size, payload2.size());
+ EXPECT_EQ(0, memcmp(s2.data, payload2.data(), payload2.size()));
+ const Slice& rep = ccr2.v2_rep_levels();
+ const Slice& def = ccr2.v2_def_levels();
+ ASSERT_EQ(rep.size, rl);
+ ASSERT_EQ(def.size, dl);
+ EXPECT_EQ(0, memcmp(rep.data, level_bytes.data(), rl));
+ EXPECT_EQ(0, memcmp(def.data, level_bytes.data() + rl, dl));
+}
+
+TEST(ParquetPageCacheTest, CacheMissThenHit) {
+ ParquetPageReadContext ctx;
+ ctx.enable_parquet_file_page_cache = true;
+
+ // uncompressed v1 page
+ tparquet::PageHeader header;
+ header.type = tparquet::PageType::DATA_PAGE;
+ header.__set_compressed_page_size(4);
+ header.__set_uncompressed_page_size(4);
+ header.__isset.data_page_header = true;
+ header.data_page_header.__set_num_values(1);
+ std::vector<uint8_t> header_bytes;
+ ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+ ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok());
+ std::vector<uint8_t> payload = {0xDE, 0xAD, 0xBE, 0xEF};
+ std::vector<uint8_t> backing(256, 0);
+ std::vector<uint8_t> cached;
+ cached.insert(cached.end(), header_bytes.begin(), header_bytes.end());
+ cached.insert(cached.end(), payload.begin(), payload.end());
+ int64_t header_offset = 64;
+ memcpy(backing.data() + header_offset, cached.data(), cached.size());
+
+ std::string path = "test_miss_then_hit";
+ FakeBufferedReader reader(path, backing);
+
+ tparquet::ColumnChunk cc;
+ cc.meta_data.__set_data_page_offset(header_offset);
+ cc.meta_data.__set_total_compressed_size(cached.size());
+ cc.meta_data.__set_num_values(1);
+ cc.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED);
+
+ FieldSchema fs;
+ fs.repetition_level = 0;
+ fs.definition_level = 0;
+
+ // First reader: should not hit cache, but should write cache
+ ColumnChunkReader<false, false> ccr(&reader, &cc, &fs, nullptr, 0,
nullptr, ctx);
+ ASSERT_TRUE(ccr.init().ok());
+ ASSERT_TRUE(ccr.load_page_data().ok());
+ auto& statistics = ccr.statistics();
+ EXPECT_EQ(statistics.page_cache_hit_counter, 0);
+ EXPECT_EQ(statistics.page_cache_write_counter, 1);
+
+ // Second reader: should hit cache
+ ColumnChunkReader<false, false> ccr2(&reader, &cc, &fs, nullptr, 0,
nullptr, ctx);
+ ASSERT_TRUE(ccr2.init().ok());
+ ASSERT_TRUE(ccr2.load_page_data().ok());
+ auto& statistics2 = ccr2.statistics();
+ EXPECT_EQ(statistics2.page_cache_hit_counter, 1);
+ EXPECT_EQ(statistics2.page_cache_decompressed_hit_counter, 1);
+}
+
+TEST(ParquetPageCacheTest, DecompressThresholdCachesCompressed) {
+ ParquetPageReadContext ctx;
+ ctx.enable_parquet_file_page_cache = true;
+
+ // prepare a compressible payload (lots of zeros)
+ std::vector<uint8_t> payload(1024, 0);
+
+ // compress payload using snappy
+ BlockCompressionCodec* codec = nullptr;
+
ASSERT_TRUE(get_block_compression_codec(segment_v2::CompressionTypePB::SNAPPY,
&codec).ok());
+ faststring compressed_fast;
+ std::vector<Slice> inputs;
+ inputs.emplace_back(payload.data(), payload.size());
+ ASSERT_TRUE(codec->compress(inputs, payload.size(),
&compressed_fast).ok());
+
+ tparquet::PageHeader header;
+ header.type = tparquet::PageType::DATA_PAGE;
+
header.__set_compressed_page_size(static_cast<int32_t>(compressed_fast.size()));
+ header.__set_uncompressed_page_size(static_cast<int32_t>(payload.size()));
+ header.__isset.data_page_header = true;
+ header.data_page_header.__set_num_values(1);
+
+ std::vector<uint8_t> header_bytes;
+ ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+ ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok());
+
+ std::vector<uint8_t> file_data;
+ file_data.insert(file_data.end(), header_bytes.begin(),
header_bytes.end());
+ file_data.insert(file_data.end(), compressed_fast.data(),
+ compressed_fast.data() + compressed_fast.size());
+
+ std::string path = "test_threshold_file_compressed";
+ FakeBufferedReader reader(path, file_data);
+
+ tparquet::ColumnChunk cc;
+ cc.meta_data.__set_data_page_offset(0);
+ cc.meta_data.__set_total_compressed_size(file_data.size());
+ cc.meta_data.__set_num_values(1);
+ cc.meta_data.__set_codec(tparquet::CompressionCodec::SNAPPY);
+
+ FieldSchema fs;
+ fs.repetition_level = 0;
+ fs.definition_level = 0;
+
+ // Case: very small threshold -> cache the compressed payload (smaller
footprint)
+ double old_thresh = config::parquet_page_cache_decompress_threshold;
+ bool old_enable_compressed = config::enable_parquet_cache_compressed_pages;
+ config::parquet_page_cache_decompress_threshold = 0.1;
+ config::enable_parquet_cache_compressed_pages = true;
+ ColumnChunkReader<false, false> ccr_small_thresh(&reader, &cc, &fs,
nullptr, 0, nullptr, ctx);
+ ASSERT_TRUE(ccr_small_thresh.init().ok());
+ // ASSERT_TRUE(ccr_small_thresh.next_page().ok());
+ ASSERT_TRUE(ccr_small_thresh.load_page_data().ok());
+ EXPECT_EQ(ccr_small_thresh.statistics().page_cache_write_counter, 1);
+
+ // Inspect cache entry: payload stored should be compressed size
+ PageCacheHandle handle_small;
+ size_t file_end = header_bytes.size() + compressed_fast.size();
+ int64_t mtime = 0;
+ StoragePageCache::CacheKey key_small(fmt::format("{}::{}", path, mtime),
+ /*file_end_offset*/ file_end,
/*header_start*/ 0);
+ bool found_small =
+ StoragePageCache::instance()->lookup(key_small, &handle_small,
segment_v2::DATA_PAGE);
+ ASSERT_TRUE(found_small);
+ Slice cached_small = handle_small.data();
+ size_t header_size = header_bytes.size();
+ size_t payload_in_cache_size = cached_small.size - header_size; // no
levels here
+ ASSERT_EQ(payload_in_cache_size, compressed_fast.size());
+
+ // restore config
+ config::parquet_page_cache_decompress_threshold = old_thresh;
+ config::enable_parquet_cache_compressed_pages = old_enable_compressed;
+}
+
+TEST(ParquetPageCacheTest, DecompressThresholdCachesDecompressed) {
+ ParquetPageReadContext ctx;
+ ctx.enable_parquet_file_page_cache = true;
+
+ // prepare a compressible payload (lots of zeros)
+ std::vector<uint8_t> payload(1024, 0);
+
+ // compress payload using snappy
+ BlockCompressionCodec* codec = nullptr;
+
ASSERT_TRUE(get_block_compression_codec(segment_v2::CompressionTypePB::SNAPPY,
&codec).ok());
+ faststring compressed_fast;
+ std::vector<Slice> inputs;
+ inputs.emplace_back(payload.data(), payload.size());
+ ASSERT_TRUE(codec->compress(inputs, payload.size(),
&compressed_fast).ok());
+
+ tparquet::PageHeader header;
+ header.type = tparquet::PageType::DATA_PAGE;
+
header.__set_compressed_page_size(static_cast<int32_t>(compressed_fast.size()));
+ header.__set_uncompressed_page_size(static_cast<int32_t>(payload.size()));
+ header.__isset.data_page_header = true;
+ header.data_page_header.__set_num_values(1);
+
+ std::vector<uint8_t> header_bytes;
+ ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+ ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok());
+
+ std::vector<uint8_t> file_data;
+ file_data.insert(file_data.end(), header_bytes.begin(),
header_bytes.end());
+ file_data.insert(file_data.end(), compressed_fast.data(),
+ compressed_fast.data() + compressed_fast.size());
+
+ std::string path = "test_threshold_file_decompressed";
+ FakeBufferedReader reader(path, file_data);
+
+ tparquet::ColumnChunk cc;
+ cc.meta_data.__set_data_page_offset(0);
+ cc.meta_data.__set_total_compressed_size(file_data.size());
+ cc.meta_data.__set_num_values(1);
+ cc.meta_data.__set_codec(tparquet::CompressionCodec::SNAPPY);
+
+ FieldSchema fs;
+ fs.repetition_level = 0;
+ fs.definition_level = 0;
+
+ // Case: very large threshold -> cache decompressed payload
+ double old_thresh = config::parquet_page_cache_decompress_threshold;
+ bool old_enable_compressed = config::enable_parquet_cache_compressed_pages;
+ config::parquet_page_cache_decompress_threshold = 100.0;
+ config::enable_parquet_cache_compressed_pages = false;
+ ColumnChunkReader<false, false> ccr_large_thresh(&reader, &cc, &fs,
nullptr, 0, nullptr, ctx);
+ ASSERT_TRUE(ccr_large_thresh.init().ok());
+ // ASSERT_TRUE(ccr_large_thresh.next_page().ok());
+ ASSERT_TRUE(ccr_large_thresh.load_page_data().ok());
+ EXPECT_EQ(ccr_large_thresh.statistics().page_cache_write_counter, 1);
+
+ // Inspect cache entry for large threshold: payload stored should be
uncompressed size
+ PageCacheHandle handle_large;
+ size_t file_end = header_bytes.size() + compressed_fast.size();
+ int64_t mtime = 0;
+ StoragePageCache::CacheKey key_large(fmt::format("{}::{}", path, mtime),
+ /*file_end_offset*/ file_end,
/*header_start*/ 0);
+ bool found_large =
+ StoragePageCache::instance()->lookup(key_large, &handle_large,
segment_v2::DATA_PAGE);
+ ASSERT_TRUE(found_large);
+ Slice cached_large = handle_large.data();
+ size_t payload_in_cache_size_large = cached_large.size -
header_bytes.size();
+ ASSERT_EQ(payload_in_cache_size_large, payload.size());
+
+ // Verify cache hit for a new reader (should hit the decompressed entry we
just created)
+ ColumnChunkReader<false, false> ccr_check(&reader, &cc, &fs, nullptr, 0,
nullptr, ctx);
+ ASSERT_TRUE(ccr_check.init().ok());
+ // ASSERT_TRUE(ccr_check.next_page().ok());
+ ASSERT_TRUE(ccr_check.load_page_data().ok());
+ EXPECT_EQ(ccr_check.statistics().page_cache_hit_counter, 1);
+ // restore config
+ config::parquet_page_cache_decompress_threshold = old_thresh;
+ config::enable_parquet_cache_compressed_pages = old_enable_compressed;
+}
+
+TEST(ParquetPageCacheTest, MultipleReadersShareCachedEntry) {
+ ParquetPageReadContext ctx;
+ ctx.enable_parquet_file_page_cache = true;
+ double old_thresh = config::parquet_page_cache_decompress_threshold;
+ bool old_enable_compressed = config::enable_parquet_cache_compressed_pages;
+ config::parquet_page_cache_decompress_threshold = 100.0;
+ config::enable_parquet_cache_compressed_pages = false;
+
+ // Create a v2 cached page and then instantiate multiple readers that hit
the cache
+ std::string path = "test_shared_handles";
+ tparquet::PageHeader hdr;
+ hdr.type = tparquet::PageType::DATA_PAGE_V2;
+ int rl = 2;
+ int dl = 1;
+ hdr.__isset.data_page_header_v2 = true;
+ hdr.data_page_header_v2.__set_repetition_levels_byte_length(rl);
+ hdr.data_page_header_v2.__set_definition_levels_byte_length(dl);
+ hdr.data_page_header_v2.__set_is_compressed(false);
+ hdr.data_page_header_v2.__set_num_values(1);
+ std::vector<uint8_t> header_bytes;
+ ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+ ASSERT_TRUE(ts.serialize(&hdr, &header_bytes).ok());
+ std::vector<uint8_t> level_bytes = {0x11, 0x22, 0x33};
+ std::vector<uint8_t> payload = {0x0A, 0x0B};
+ std::vector<uint8_t> cached;
+ cached.insert(cached.end(), header_bytes.begin(), header_bytes.end());
+ cached.insert(cached.end(), level_bytes.begin(), level_bytes.end());
+ cached.insert(cached.end(), payload.begin(), payload.end());
+
+ size_t total = cached.size();
+ auto* page = new DataPage(total, true, segment_v2::DATA_PAGE);
+ memcpy(page->data(), cached.data(), total);
+ page->reset_size(total);
+ PageCacheHandle handle;
+ size_t header_start = 512;
+ int64_t mtime = 0;
+ StoragePageCache::CacheKey key(fmt::format("{}::{}", path, mtime),
+ static_cast<size_t>(header_start + total),
header_start);
+ StoragePageCache::instance()->insert(key, page, &handle,
segment_v2::DATA_PAGE);
+
+ // Create multiple readers that will hit cache
+ const int N = 4;
+ for (int i = 0; i < N; ++i) {
+ std::vector<uint8_t> reader_backing(5000, 0);
+ memcpy(reader_backing.data() + header_start, cached.data(), total);
+ FakeBufferedReader reader(path, reader_backing);
+ tparquet::ColumnChunk cc;
+ cc.meta_data.__set_data_page_offset(512);
+ cc.meta_data.__set_total_compressed_size(total);
+ cc.meta_data.__set_num_values(1);
+ cc.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED);
+ FieldSchema fs;
+ fs.repetition_level = rl;
+ fs.definition_level = dl;
+ ColumnChunkReader<false, false> ccr(&reader, &cc, &fs, nullptr, 0,
nullptr, ctx);
+ ASSERT_TRUE(ccr.init().ok());
+ ASSERT_TRUE(ccr.load_page_data().ok());
+ Slice s = ccr.get_page_data();
+ ASSERT_EQ(s.size, payload.size());
+ EXPECT_EQ(0, memcmp(s.data, payload.data(), payload.size()));
+ const Slice& rep = ccr.v2_rep_levels();
+ const Slice& def = ccr.v2_def_levels();
+ ASSERT_EQ(rep.size, rl);
+ ASSERT_EQ(def.size, dl);
+ }
+ // restore config
+ config::parquet_page_cache_decompress_threshold = old_thresh;
+ config::enable_parquet_cache_compressed_pages = old_enable_compressed;
+}
diff --git a/be/test/vec/exec/format/parquet/parquet_thrift_test.cpp
b/be/test/vec/exec/format/parquet/parquet_thrift_test.cpp
index 1d9c62ebfad..e3695b2b545 100644
--- a/be/test/vec/exec/format/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/format/parquet/parquet_thrift_test.cpp
@@ -197,8 +197,9 @@ static Status get_column_values(io::FileReaderSPtr
file_reader, tparquet::Column
io::BufferedFileStreamReader stream_reader(file_reader, start_offset,
chunk_size, 1024);
+ ParquetPageReadContext page_read_ctx;
ColumnChunkReader<false, false> chunk_reader(&stream_reader, column_chunk,
field_schema,
- nullptr, total_rows, nullptr);
+ nullptr, total_rows, nullptr,
page_read_ctx);
// initialize chunk reader
static_cast<void>(chunk_reader.init());
// seek to next page header
diff --git a/be/test/vec/exec/orc/orc_file_reader_test.cpp
b/be/test/vec/exec/orc/orc_file_reader_test.cpp
index 9e1003c397f..4c71129cdbb 100644
--- a/be/test/vec/exec/orc/orc_file_reader_test.cpp
+++ b/be/test/vec/exec/orc/orc_file_reader_test.cpp
@@ -41,6 +41,8 @@ public:
bool closed() const override { return _closed; }
+ int64_t mtime() const override { return 0; }
+
void set_data(const std::string& data) { _data = data; }
protected:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 576c3019144..2120dc5e73a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -506,6 +506,7 @@ public class SessionVariable implements Serializable,
Writable {
public static final String SHOW_USER_DEFAULT_ROLE =
"show_user_default_role";
public static final String ENABLE_PAGE_CACHE = "enable_page_cache";
+ public static final String ENABLE_PARQUET_FILE_PAGE_CACHE =
"enable_parquet_file_page_cache";
public static final String MINIDUMP_PATH = "minidump_path";
@@ -2232,6 +2233,13 @@ public class SessionVariable implements Serializable,
Writable {
needForward = true)
public boolean enablePageCache = true;
+ @VariableMgr.VarAttr(
+ name = ENABLE_PARQUET_FILE_PAGE_CACHE,
+ description = {"控制是否启用 Parquet file page cache。默认为 true。",
+ "Controls whether to use Parquet file page cache. The
default is true."},
+ needForward = true)
+ public boolean enableParquetFilePageCache = true;
+
@VariableMgr.VarAttr(name = ENABLE_FOLD_NONDETERMINISTIC_FN)
public boolean enableFoldNondeterministicFn = false;
@@ -5115,6 +5123,8 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setEnablePageCache(enablePageCache);
+ tResult.setEnableParquetFilePageCache(enableParquetFilePageCache);
+
tResult.setFileCacheBasePath(fileCacheBasePath);
tResult.setEnableInvertedIndexQuery(enableInvertedIndexQuery);
@@ -5129,6 +5139,8 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setEnableOrcLazyMat(enableOrcLazyMat);
tResult.setEnableParquetFilterByMinMax(enableParquetFilterByMinMax);
tResult.setEnableParquetFilterByBloomFilter(enableParquetFilterByBloomFilter);
+
+ tResult.setEnableParquetFilePageCache(enableParquetFilePageCache);
tResult.setEnableOrcFilterByMinMax(enableOrcFilterByMinMax);
tResult.setCheckOrcInitSargsSuccess(checkOrcInitSargsSuccess);
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 9fbc2058232..3f1c5feedb9 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -436,6 +436,8 @@ struct TQueryOptions {
// hash table expansion thresholds since all data is local.
202: optional bool single_backend_query = false;
+ 185: optional bool enable_parquet_file_page_cache = true;
+
// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]