This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new e8f040fe3c3 branch-4.1: [feature](vparquet-reader) Implements parquet 
file page cache. (#59307) (#61477)
e8f040fe3c3 is described below

commit e8f040fe3c3da61c1166712dfe0fc8655f45a3ed
Author: Qi Chen <[email protected]>
AuthorDate: Thu Mar 19 10:52:44 2026 +0800

    branch-4.1: [feature](vparquet-reader) Implements parquet file page cache. 
(#59307) (#61477)
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    Cherry-pick #59307
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/common/config.cpp                           |   6 +
 be/src/common/config.h                             |   6 +
 be/src/io/cache/cached_remote_file_reader.h        |   2 +
 be/src/io/file_factory.cpp                         |  12 +
 be/src/io/file_factory.h                           |   6 +
 be/src/io/fs/broker_file_reader.cpp                |   6 +-
 be/src/io/fs/broker_file_reader.h                  |   5 +-
 be/src/io/fs/broker_file_system.cpp                |   2 +-
 be/src/io/fs/buffered_reader.h                     |  12 +
 be/src/io/fs/file_reader.h                         |   3 +
 be/src/io/fs/hdfs_file_reader.cpp                  |   7 +-
 be/src/io/fs/hdfs_file_reader.h                    |   5 +-
 be/src/io/fs/http_file_reader.cpp                  |   7 +-
 be/src/io/fs/http_file_reader.h                    |   5 +-
 be/src/io/fs/http_file_system.cpp                  |   2 +-
 be/src/io/fs/local_file_reader.h                   |   2 +
 be/src/io/fs/packed_file_reader.h                  |   2 +
 be/src/io/fs/s3_file_reader.h                      |   2 +
 be/src/io/fs/stream_load_pipe.h                    |   2 +
 be/src/io/fs/tracing_file_reader.h                 |   2 +
 be/src/vec/exec/format/orc/orc_file_reader.h       |   2 +
 .../parquet/vparquet_column_chunk_reader.cpp       | 294 ++++++--
 .../format/parquet/vparquet_column_chunk_reader.h  |  45 +-
 .../exec/format/parquet/vparquet_column_reader.cpp |  31 +-
 .../exec/format/parquet/vparquet_column_reader.h   |  45 +-
 .../exec/format/parquet/vparquet_group_reader.cpp  |   2 +-
 .../exec/format/parquet/vparquet_page_reader.cpp   |  76 ++
 .../vec/exec/format/parquet/vparquet_page_reader.h |  96 ++-
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |  31 +
 be/src/vec/exec/format/parquet/vparquet_reader.h   |   8 +
 be/test/io/fs/buffered_reader_test.cpp             |   6 +
 be/test/io/fs/packed_file_concurrency_test.cpp     |   2 +
 be/test/io/fs/packed_file_reader_test.cpp          |   2 +
 be/test/io/fs/packed_file_system_test.cpp          |   2 +
 .../format/file_reader/file_meta_cache_test.cpp    |   2 +
 .../format/parquet/parquet_page_cache_test.cpp     | 804 +++++++++++++++++++++
 .../exec/format/parquet/parquet_thrift_test.cpp    |   3 +-
 be/test/vec/exec/orc/orc_file_reader_test.cpp      |   2 +
 .../java/org/apache/doris/qe/SessionVariable.java  |  12 +
 gensrc/thrift/PaloInternalService.thrift           |   2 +
 40 files changed, 1469 insertions(+), 94 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 372a33dc886..c74d46069b2 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -404,6 +404,12 @@ DEFINE_Int32(index_page_cache_percentage, "10");
 DEFINE_mBool(disable_storage_page_cache, "false");
 // whether to disable row cache feature in storage
 DEFINE_mBool(disable_storage_row_cache, "true");
+// Parquet page cache: threshold ratio for caching decompressed vs compressed 
pages
+// If uncompressed_size / compressed_size <= threshold, cache decompressed;
+// otherwise cache compressed if enable_parquet_cache_compressed_pages = true
+DEFINE_Double(parquet_page_cache_decompress_threshold, "1.5");
+// Parquet page cache: whether to enable caching compressed pages (when ratio 
exceeds threshold)
+DEFINE_Bool(enable_parquet_cache_compressed_pages, "false");
 // whether to disable pk page cache feature in storage
 DEFINE_Bool(disable_pk_storage_page_cache, "false");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index f8416557a2e..c7c06f00722 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -472,6 +472,12 @@ DECLARE_Int32(index_page_cache_percentage);
 DECLARE_Bool(disable_storage_page_cache);
 // whether to disable row cache feature in storage
 DECLARE_mBool(disable_storage_row_cache);
+// Parquet page cache: threshold ratio for caching decompressed vs compressed 
pages
+// If uncompressed_size / compressed_size <= threshold, cache decompressed;
+// otherwise cache compressed if enable_parquet_cache_compressed_pages = true
+DECLARE_Double(parquet_page_cache_decompress_threshold);
+// Parquet page cache: whether to enable caching compressed pages (when ratio 
exceeds threshold)
+DECLARE_Bool(enable_parquet_cache_compressed_pages);
 // whether to disable pk page cache feature in storage
 DECLARE_Bool(disable_pk_storage_page_cache);
 
diff --git a/be/src/io/cache/cached_remote_file_reader.h 
b/be/src/io/cache/cached_remote_file_reader.h
index 939471b62ea..20c1a47ce88 100644
--- a/be/src/io/cache/cached_remote_file_reader.h
+++ b/be/src/io/cache/cached_remote_file_reader.h
@@ -55,6 +55,8 @@ public:
 
     static std::pair<size_t, size_t> s_align_size(size_t offset, size_t size, 
size_t length);
 
+    int64_t mtime() const override { return _remote_file_reader->mtime(); }
+
 protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index bd08bc20461..7b1821c0fa8 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -203,6 +203,18 @@ Result<io::FileReaderSPtr> FileFactory::create_file_reader(
         const io::FileSystemProperties& system_properties,
         const io::FileDescription& file_description, const 
io::FileReaderOptions& reader_options,
         RuntimeProfile* profile) {
+    auto reader_res = _create_file_reader_internal(system_properties, 
file_description,
+                                                   reader_options, profile);
+    if (!reader_res.has_value()) {
+        return unexpected(std::move(reader_res).error());
+    }
+    return std::move(reader_res).value();
+}
+
+Result<io::FileReaderSPtr> FileFactory::_create_file_reader_internal(
+        const io::FileSystemProperties& system_properties,
+        const io::FileDescription& file_description, const 
io::FileReaderOptions& reader_options,
+        RuntimeProfile* profile) {
     TFileType::type type = system_properties.system_type;
     switch (type) {
     case TFileType::FILE_LOCAL: {
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index 0ba791bd0a3..61e322ca0af 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -126,6 +126,12 @@ public:
 
 private:
     static std::string _get_fs_name(const io::FileDescription& 
file_description);
+
+    /// Create FileReader without FS
+    static Result<io::FileReaderSPtr> _create_file_reader_internal(
+            const io::FileSystemProperties& system_properties,
+            const io::FileDescription& file_description,
+            const io::FileReaderOptions& reader_options, RuntimeProfile* 
profile = nullptr);
 };
 
 } // namespace doris
diff --git a/be/src/io/fs/broker_file_reader.cpp 
b/be/src/io/fs/broker_file_reader.cpp
index 102ea3e2477..41b2992f700 100644
--- a/be/src/io/fs/broker_file_reader.cpp
+++ b/be/src/io/fs/broker_file_reader.cpp
@@ -39,12 +39,14 @@ struct IOContext;
 
 BrokerFileReader::BrokerFileReader(const TNetworkAddress& broker_addr, Path 
path, size_t file_size,
                                    TBrokerFD fd,
-                                   std::shared_ptr<BrokerServiceConnection> 
connection)
+                                   std::shared_ptr<BrokerServiceConnection> 
connection,
+                                   int64_t mtime)
         : _path(std::move(path)),
           _file_size(file_size),
           _broker_addr(broker_addr),
           _fd(fd),
-          _connection(std::move(connection)) {
+          _connection(std::move(connection)),
+          _mtime(mtime) {
     DorisMetrics::instance()->broker_file_open_reading->increment(1);
     DorisMetrics::instance()->broker_file_reader_total->increment(1);
 }
diff --git a/be/src/io/fs/broker_file_reader.h 
b/be/src/io/fs/broker_file_reader.h
index 7d19edb32c0..2f6bd94b652 100644
--- a/be/src/io/fs/broker_file_reader.h
+++ b/be/src/io/fs/broker_file_reader.h
@@ -38,7 +38,7 @@ struct IOContext;
 class BrokerFileReader final : public FileReader {
 public:
     BrokerFileReader(const TNetworkAddress& broker_addr, Path path, size_t 
file_size, TBrokerFD fd,
-                     std::shared_ptr<BrokerServiceConnection> connection);
+                     std::shared_ptr<BrokerServiceConnection> connection, 
int64_t mtime = 0);
 
     ~BrokerFileReader() override;
 
@@ -50,6 +50,8 @@ public:
 
     bool closed() const override { return 
_closed.load(std::memory_order_acquire); }
 
+    int64_t mtime() const override { return _mtime; }
+
 protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
@@ -62,6 +64,7 @@ private:
     TBrokerFD _fd;
 
     std::shared_ptr<BrokerServiceConnection> _connection;
+    int64_t _mtime;
     std::atomic<bool> _closed = false;
 };
 } // namespace doris::io
diff --git a/be/src/io/fs/broker_file_system.cpp 
b/be/src/io/fs/broker_file_system.cpp
index 8b0d5db23e2..b0dc89dc277 100644
--- a/be/src/io/fs/broker_file_system.cpp
+++ b/be/src/io/fs/broker_file_system.cpp
@@ -139,7 +139,7 @@ Status BrokerFileSystem::open_file_internal(const Path& 
file, FileReaderSPtr* re
                                error_msg(response->opStatus.message));
     }
     *reader = std::make_shared<BrokerFileReader>(_broker_addr, file, fsize, 
response->fd,
-                                                 _connection);
+                                                 _connection, opts.mtime);
     return Status::OK();
 }
 
diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index 5fe07176235..6ddcca02067 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -160,6 +160,8 @@ public:
 
     bool closed() const override { return _closed; }
 
+    int64_t mtime() const override { return _inner_reader->mtime(); }
+
 protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
@@ -329,6 +331,8 @@ public:
 
     bool closed() const override { return _closed; }
 
+    int64_t mtime() const override { return _reader->mtime(); }
+
     // for test only
     size_t buffer_remaining() const { return _remaining; }
 
@@ -532,6 +536,8 @@ public:
 
     bool closed() const override { return _closed; }
 
+    int64_t mtime() const override { return _reader->mtime(); }
+
     void set_random_access_ranges(const std::vector<PrefetchRange>* 
random_access_ranges) {
         _random_access_ranges = random_access_ranges;
         for (auto& _pre_buffer : _pre_buffers) {
@@ -592,6 +598,8 @@ public:
 
     bool closed() const override { return _closed; }
 
+    int64_t mtime() const override { return _reader->mtime(); }
+
 protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
@@ -626,6 +634,8 @@ public:
     virtual ~BufferedStreamReader() = default;
     // return the file path
     virtual std::string path() = 0;
+
+    virtual int64_t mtime() const = 0;
 };
 
 class BufferedFileStreamReader : public BufferedStreamReader, public 
ProfileCollector {
@@ -639,6 +649,8 @@ public:
     Status read_bytes(Slice& slice, uint64_t offset, const IOContext* io_ctx) 
override;
     std::string path() override { return _file->path(); }
 
+    int64_t mtime() const override { return _file->mtime(); }
+
 protected:
     void _collect_profile_before_close() override {
         if (_file != nullptr) {
diff --git a/be/src/io/fs/file_reader.h b/be/src/io/fs/file_reader.h
index e6d8527e831..3df912cbad4 100644
--- a/be/src/io/fs/file_reader.h
+++ b/be/src/io/fs/file_reader.h
@@ -90,6 +90,9 @@ public:
 
     virtual const std::string& get_data_dir_path() { return 
VIRTUAL_REMOTE_DATA_DIR; }
 
+    // File modification time (seconds since epoch). Default to 0 meaning 
unknown.
+    virtual int64_t mtime() const = 0;
+
 protected:
     virtual Status read_at_impl(size_t offset, Slice result, size_t* 
bytes_read,
                                 const IOContext* io_ctx) = 0;
diff --git a/be/src/io/fs/hdfs_file_reader.cpp 
b/be/src/io/fs/hdfs_file_reader.cpp
index 0e278dff0c8..b1d65a63ba0 100644
--- a/be/src/io/fs/hdfs_file_reader.cpp
+++ b/be/src/io/fs/hdfs_file_reader.cpp
@@ -66,16 +66,17 @@ Result<FileReaderSPtr> HdfsFileReader::create(Path 
full_path, const hdfsFS& fs,
     auto path = convert_path(full_path, fs_name);
     return get_file(fs, path, opts.mtime, opts.file_size).transform([&](auto&& 
accessor) {
         return std::make_shared<HdfsFileReader>(std::move(path), 
std::move(fs_name),
-                                                std::move(accessor), profile);
+                                                std::move(accessor), profile, 
opts.mtime);
     });
 }
 
 HdfsFileReader::HdfsFileReader(Path path, std::string fs_name, 
FileHandleCache::Accessor accessor,
-                               RuntimeProfile* profile)
+                               RuntimeProfile* profile, int64_t mtime)
         : _path(std::move(path)),
           _fs_name(std::move(fs_name)),
           _accessor(std::move(accessor)),
-          _profile(profile) {
+          _profile(profile),
+          _mtime(mtime) {
     _handle = _accessor.get();
 
     DorisMetrics::instance()->hdfs_file_open_reading->increment(1);
diff --git a/be/src/io/fs/hdfs_file_reader.h b/be/src/io/fs/hdfs_file_reader.h
index 8556eea0de6..08f98bca29a 100644
--- a/be/src/io/fs/hdfs_file_reader.h
+++ b/be/src/io/fs/hdfs_file_reader.h
@@ -45,7 +45,7 @@ public:
                                          const FileReaderOptions& opts, 
RuntimeProfile* profile);
 
     HdfsFileReader(Path path, std::string fs_name, FileHandleCache::Accessor 
accessor,
-                   RuntimeProfile* profile);
+                   RuntimeProfile* profile, int64_t mtime = 0);
 
     ~HdfsFileReader() override;
 
@@ -57,6 +57,8 @@ public:
 
     bool closed() const override { return 
_closed.load(std::memory_order_acquire); }
 
+    int64_t mtime() const override { return _mtime; }
+
 protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
@@ -86,6 +88,7 @@ private:
     CachedHdfsFileHandle* _handle = nullptr; // owned by _cached_file_handle
     std::atomic<bool> _closed = false;
     RuntimeProfile* _profile = nullptr;
+    int64_t _mtime;
 #ifdef USE_HADOOP_HDFS
     HDFSProfile _hdfs_profile;
 #endif
diff --git a/be/src/io/fs/http_file_reader.cpp 
b/be/src/io/fs/http_file_reader.cpp
index fb243179baf..5ad984039fc 100644
--- a/be/src/io/fs/http_file_reader.cpp
+++ b/be/src/io/fs/http_file_reader.cpp
@@ -34,7 +34,7 @@ Result<FileReaderSPtr> HttpFileReader::create(const 
std::string& url,
     ofi.path = Path(url);
     ofi.extend_info = props;
 
-    auto reader = std::make_shared<HttpFileReader>(ofi, url);
+    auto reader = std::make_shared<HttpFileReader>(ofi, url, opts.mtime);
 
     // Open the file to detect Range support and validate configuration
     RETURN_IF_ERROR_RESULT(reader->open(opts));
@@ -42,11 +42,12 @@ Result<FileReaderSPtr> HttpFileReader::create(const 
std::string& url,
     return reader;
 }
 
-HttpFileReader::HttpFileReader(const OpenFileInfo& fileInfo, std::string url)
+HttpFileReader::HttpFileReader(const OpenFileInfo& fileInfo, std::string url, 
int64_t mtime)
         : _extend_kv(fileInfo.extend_info),
           _path(fileInfo.path),
           _url(std::move(url)),
-          _client(std::make_unique<HttpClient>()) {
+          _client(std::make_unique<HttpClient>()),
+          _mtime(mtime) {
     auto etag_iter = _extend_kv.find("etag");
     if (etag_iter != _extend_kv.end()) {
         _etag = etag_iter->second;
diff --git a/be/src/io/fs/http_file_reader.h b/be/src/io/fs/http_file_reader.h
index 607eedf3d1a..982e65905aa 100644
--- a/be/src/io/fs/http_file_reader.h
+++ b/be/src/io/fs/http_file_reader.h
@@ -41,7 +41,7 @@ public:
                                          const std::map<std::string, 
std::string>& props,
                                          const FileReaderOptions& opts, 
RuntimeProfile* profile);
 
-    explicit HttpFileReader(const OpenFileInfo& fileInfo, std::string url);
+    explicit HttpFileReader(const OpenFileInfo& fileInfo, std::string url, 
int64_t mtime);
     ~HttpFileReader() override;
 
     Status open(const FileReaderOptions& opts);
@@ -52,6 +52,8 @@ public:
     bool closed() const override { return 
_closed.load(std::memory_order_acquire); }
     size_t size() const override { return _file_size; }
 
+    int64_t mtime() const override { return _mtime; }
+
 private:
     // Prepare and initialize the HTTP client for a new request
     Status prepare_client(bool set_fail_on_error = true);
@@ -78,6 +80,7 @@ private:
     int64_t _last_modified = 0;
     std::atomic<bool> _closed = false;
     std::unique_ptr<HttpClient> _client;
+    int64_t _mtime;
 
     // Configuration for non-Range request handling
     bool _enable_range_request = true;                         // Whether 
Range request is required
diff --git a/be/src/io/fs/http_file_system.cpp 
b/be/src/io/fs/http_file_system.cpp
index 92e175ca774..b1e8de354ad 100644
--- a/be/src/io/fs/http_file_system.cpp
+++ b/be/src/io/fs/http_file_system.cpp
@@ -56,7 +56,7 @@ Status HttpFileSystem::open_file_internal(const Path& path, 
FileReaderSPtr* read
     // Pass properties (including HTTP headers) to the file reader
     file_info.extend_info = _properties;
 
-    auto http_reader = std::make_shared<HttpFileReader>(file_info, 
path.native());
+    auto http_reader = std::make_shared<HttpFileReader>(file_info, 
path.native(), opts.mtime);
     RETURN_IF_ERROR(http_reader->open(opts));
     *reader = http_reader;
     return Status::OK();
diff --git a/be/src/io/fs/local_file_reader.h b/be/src/io/fs/local_file_reader.h
index 38b4cfa55af..e11bb152b67 100644
--- a/be/src/io/fs/local_file_reader.h
+++ b/be/src/io/fs/local_file_reader.h
@@ -63,6 +63,8 @@ public:
 
     const std::string& get_data_dir_path() override { return _data_dir_path; }
 
+    int64_t mtime() const override { return 0; }
+
 private:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
diff --git a/be/src/io/fs/packed_file_reader.h 
b/be/src/io/fs/packed_file_reader.h
index 79cd23c8cd7..b6b423fbbfe 100644
--- a/be/src/io/fs/packed_file_reader.h
+++ b/be/src/io/fs/packed_file_reader.h
@@ -48,6 +48,8 @@ public:
 
     bool closed() const override { return _closed; }
 
+    int64_t mtime() const override { return _inner_reader->mtime(); }
+
 protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
diff --git a/be/src/io/fs/s3_file_reader.h b/be/src/io/fs/s3_file_reader.h
index 58294ec1891..40e3ac61d3a 100644
--- a/be/src/io/fs/s3_file_reader.h
+++ b/be/src/io/fs/s3_file_reader.h
@@ -53,6 +53,8 @@ public:
 
     bool closed() const override { return 
_closed.load(std::memory_order_acquire); }
 
+    int64_t mtime() const override { return 0; }
+
 protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
diff --git a/be/src/io/fs/stream_load_pipe.h b/be/src/io/fs/stream_load_pipe.h
index cedab0b6c17..df137a9267c 100644
--- a/be/src/io/fs/stream_load_pipe.h
+++ b/be/src/io/fs/stream_load_pipe.h
@@ -57,6 +57,8 @@ public:
 
     size_t size() const override { return 0; }
 
+    int64_t mtime() const override { return 0; }
+
     // called when consumer finished
     Status close() override {
         if (!(_finished || _cancelled)) {
diff --git a/be/src/io/fs/tracing_file_reader.h 
b/be/src/io/fs/tracing_file_reader.h
index 39b70dfbb63..7a6651afd21 100644
--- a/be/src/io/fs/tracing_file_reader.h
+++ b/be/src/io/fs/tracing_file_reader.h
@@ -47,6 +47,8 @@ public:
     void _collect_profile_at_runtime() override { return 
_inner->collect_profile_at_runtime(); }
     void _collect_profile_before_close() override { return 
_inner->collect_profile_before_close(); }
 
+    int64_t mtime() const override { return _inner->mtime(); }
+
     FileReaderStats* stats() const { return _stats; }
     doris::io::FileReaderSPtr inner_reader() { return _inner; }
 
diff --git a/be/src/vec/exec/format/orc/orc_file_reader.h 
b/be/src/vec/exec/format/orc/orc_file_reader.h
index 503777e67c2..15aeed33242 100644
--- a/be/src/vec/exec/format/orc/orc_file_reader.h
+++ b/be/src/vec/exec/format/orc/orc_file_reader.h
@@ -54,6 +54,8 @@ public:
 
     bool closed() const override { return _closed; }
 
+    int64_t mtime() const override { return _inner_reader->mtime(); }
+
     // for test only
     const Statistics& statistics() const { return _statistics; }
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
index e7f5f94d32f..7e228e58dff 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
@@ -22,9 +22,12 @@
 #include <string.h>
 
 #include <cstdint>
+#include <memory>
 #include <utility>
 
 #include "common/compiler_util.h" // IWYU pragma: keep
+#include "io/fs/buffered_reader.h"
+#include "olap/page_cache.h"
 #include "util/bit_util.h"
 #include "util/block_compression.h"
 #include "util/runtime_profile.h"
@@ -52,7 +55,7 @@ template <bool IN_COLLECTION, bool OFFSET_INDEX>
 ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::ColumnChunkReader(
         io::BufferedStreamReader* reader, tparquet::ColumnChunk* column_chunk,
         FieldSchema* field_schema, const tparquet::OffsetIndex* offset_index, 
size_t total_rows,
-        io::IOContext* io_ctx)
+        io::IOContext* io_ctx, const ParquetPageReadContext& page_read_ctx)
         : _field_schema(field_schema),
           _max_rep_level(field_schema->repetition_level),
           _max_def_level(field_schema->definition_level),
@@ -60,7 +63,8 @@ ColumnChunkReader<IN_COLLECTION, 
OFFSET_INDEX>::ColumnChunkReader(
           _metadata(column_chunk->meta_data),
           _offset_index(offset_index),
           _total_rows(total_rows),
-          _io_ctx(io_ctx) {}
+          _io_ctx(io_ctx),
+          _page_read_ctx(page_read_ctx) {}
 
 template <bool IN_COLLECTION, bool OFFSET_INDEX>
 Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::init() {
@@ -69,7 +73,8 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::init() 
{
     size_t chunk_size = _metadata.total_compressed_size;
     // create page reader
     _page_reader = create_page_reader<IN_COLLECTION, OFFSET_INDEX>(
-            _stream_reader, _io_ctx, start_offset, chunk_size, _total_rows, 
_offset_index);
+            _stream_reader, _io_ctx, start_offset, chunk_size, _total_rows, 
_metadata,
+            _page_read_ctx, _offset_index);
     // get the block compression codec
     RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, 
&_block_compress_codec));
     _state = INITIALIZED;
@@ -104,7 +109,7 @@ Status ColumnChunkReader<IN_COLLECTION, 
OFFSET_INDEX>::_parse_first_page_header(
     RETURN_IF_ERROR(parse_page_header());
 
     const tparquet::PageHeader* header = nullptr;
-    RETURN_IF_ERROR(_page_reader->get_page_header(header));
+    RETURN_IF_ERROR(_page_reader->get_page_header(&header));
     if (header->type == tparquet::PageType::DICTIONARY_PAGE) {
         // the first page maybe directory page even if 
_metadata.__isset.dictionary_page_offset == false,
         // so we should parse the directory page in next_page()
@@ -125,8 +130,7 @@ Status ColumnChunkReader<IN_COLLECTION, 
OFFSET_INDEX>::parse_page_header() {
     RETURN_IF_ERROR(_page_reader->parse_page_header());
 
     const tparquet::PageHeader* header = nullptr;
-    ;
-    RETURN_IF_ERROR(_page_reader->get_page_header(header));
+    RETURN_IF_ERROR(_page_reader->get_page_header(&header));
     int32_t page_num_values = _page_reader->is_header_v2() ? 
header->data_page_header_v2.num_values
                                                            : 
header->data_page_header.num_values;
     _remaining_rep_nums = page_num_values;
@@ -169,37 +173,144 @@ Status ColumnChunkReader<IN_COLLECTION, 
OFFSET_INDEX>::load_page_data() {
     }
 
     const tparquet::PageHeader* header = nullptr;
-    RETURN_IF_ERROR(_page_reader->get_page_header(header));
+    RETURN_IF_ERROR(_page_reader->get_page_header(&header));
     int32_t uncompressed_size = header->uncompressed_page_size;
+    bool page_loaded = false;
+
+    // First, try to reuse a cache handle previously discovered by PageReader
+    // (header-only lookup) to avoid a second lookup here.
+    if (_page_read_ctx.enable_parquet_file_page_cache && 
!config::disable_storage_page_cache &&
+        StoragePageCache::instance() != nullptr) {
+        if (_page_reader->has_page_cache_handle()) {
+            const PageCacheHandle& handle = _page_reader->page_cache_handle();
+            Slice cached = handle.data();
+            size_t header_size = _page_reader->header_bytes().size();
+            size_t levels_size = 0;
+            if (header->__isset.data_page_header_v2) {
+                const tparquet::DataPageHeaderV2& header_v2 = 
header->data_page_header_v2;
+                size_t rl = header_v2.repetition_levels_byte_length;
+                size_t dl = header_v2.definition_levels_byte_length;
+                levels_size = rl + dl;
+                _v2_rep_levels =
+                        Slice(reinterpret_cast<const uint8_t*>(cached.data) + 
header_size, rl);
+                _v2_def_levels =
+                        Slice(reinterpret_cast<const uint8_t*>(cached.data) + 
header_size + rl, dl);
+            }
+            // payload_slice points to the bytes after header and levels
+            Slice payload_slice(cached.data + header_size + levels_size,
+                                cached.size - header_size - levels_size);
 
-    if (_block_compress_codec != nullptr) {
-        Slice compressed_data;
-        RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data));
-        if (header->__isset.data_page_header_v2) {
-            const tparquet::DataPageHeaderV2& header_v2 = 
header->data_page_header_v2;
-            // uncompressed_size = rl + dl + uncompressed_data_size
-            // compressed_size = rl + dl + compressed_data_size
-            uncompressed_size -= header_v2.repetition_levels_byte_length +
-                                 header_v2.definition_levels_byte_length;
-            _get_uncompressed_levels(header_v2, compressed_data);
+            bool cache_payload_is_decompressed = 
_page_reader->is_cache_payload_decompressed();
+
+            if (cache_payload_is_decompressed) {
+                // Cached payload is already uncompressed
+                _page_data = payload_slice;
+            } else {
+                CHECK(_block_compress_codec);
+                // Decompress cached payload into _decompress_buf for decoding
+                size_t uncompressed_payload_size =
+                        header->__isset.data_page_header_v2
+                                ? 
static_cast<size_t>(header->uncompressed_page_size) - levels_size
+                                : 
static_cast<size_t>(header->uncompressed_page_size);
+                _reserve_decompress_buf(uncompressed_payload_size);
+                _page_data = Slice(_decompress_buf.get(), 
uncompressed_payload_size);
+                SCOPED_RAW_TIMER(&_chunk_statistics.decompress_time);
+                _chunk_statistics.decompress_cnt++;
+                
RETURN_IF_ERROR(_block_compress_codec->decompress(payload_slice, &_page_data));
+            }
+            // page cache counters were incremented when PageReader did the 
header-only
+            // cache lookup. Do not increment again to avoid double-counting.
+            page_loaded = true;
         }
-        bool is_v2_compressed =
-                header->__isset.data_page_header_v2 && 
header->data_page_header_v2.is_compressed;
-        if (header->__isset.data_page_header || is_v2_compressed) {
-            // check decompressed buffer size
-            _reserve_decompress_buf(uncompressed_size);
-            _page_data = Slice(_decompress_buf.get(), uncompressed_size);
-            SCOPED_RAW_TIMER(&_chunk_statistics.decompress_time);
-            _chunk_statistics.decompress_cnt++;
-            RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, 
&_page_data));
+    }
+
+    if (!page_loaded) {
+        if (_block_compress_codec != nullptr) {
+            Slice compressed_data;
+            RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data));
+            std::vector<uint8_t> level_bytes;
+            if (header->__isset.data_page_header_v2) {
+                const tparquet::DataPageHeaderV2& header_v2 = 
header->data_page_header_v2;
+                // uncompressed_size = rl + dl + uncompressed_data_size
+                // compressed_size = rl + dl + compressed_data_size
+                uncompressed_size -= header_v2.repetition_levels_byte_length +
+                                     header_v2.definition_levels_byte_length;
+                // copy level bytes (rl + dl) so that we can cache header + 
levels + uncompressed payload
+                size_t rl = header_v2.repetition_levels_byte_length;
+                size_t dl = header_v2.definition_levels_byte_length;
+                size_t level_sz = rl + dl;
+                if (level_sz > 0) {
+                    level_bytes.resize(level_sz);
+                    memcpy(level_bytes.data(), compressed_data.data, level_sz);
+                }
+                // now remove levels from compressed_data for decompression
+                _get_uncompressed_levels(header_v2, compressed_data);
+            }
+            bool is_v2_compressed = header->__isset.data_page_header_v2 &&
+                                    header->data_page_header_v2.is_compressed;
+            bool page_has_compression = header->__isset.data_page_header || 
is_v2_compressed;
+
+            if (page_has_compression) {
+                // Decompress payload for immediate decoding
+                _reserve_decompress_buf(uncompressed_size);
+                _page_data = Slice(_decompress_buf.get(), uncompressed_size);
+                SCOPED_RAW_TIMER(&_chunk_statistics.decompress_time);
+                _chunk_statistics.decompress_cnt++;
+                
RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, 
&_page_data));
+
+                // Decide whether to cache decompressed payload or compressed 
payload based on threshold
+                bool cache_payload_decompressed = 
should_cache_decompressed(header, _metadata);
+
+                if (_page_read_ctx.enable_parquet_file_page_cache &&
+                    !config::disable_storage_page_cache &&
+                    StoragePageCache::instance() != nullptr &&
+                    !_page_reader->header_bytes().empty()) {
+                    if (cache_payload_decompressed) {
+                        _insert_page_into_cache(level_bytes, _page_data);
+                        
_chunk_statistics.page_cache_decompressed_write_counter += 1;
+                    } else {
+                        if (config::enable_parquet_cache_compressed_pages) {
+                            // cache the compressed payload as-is (header | 
levels | compressed_payload)
+                            _insert_page_into_cache(
+                                    level_bytes, Slice(compressed_data.data, 
compressed_data.size));
+                            
_chunk_statistics.page_cache_compressed_write_counter += 1;
+                        }
+                    }
+                }
+            } else {
+                // no compression on this page, use the data directly
+                _page_data = Slice(compressed_data.data, compressed_data.size);
+                if (_page_read_ctx.enable_parquet_file_page_cache &&
+                    !config::disable_storage_page_cache &&
+                    StoragePageCache::instance() != nullptr) {
+                    _insert_page_into_cache(level_bytes, _page_data);
+                    _chunk_statistics.page_cache_decompressed_write_counter += 
1;
+                }
+            }
         } else {
-            // Don't need decompress
-            _page_data = Slice(compressed_data.data, compressed_data.size);
-        }
-    } else {
-        RETURN_IF_ERROR(_page_reader->get_page_data(_page_data));
-        if (header->__isset.data_page_header_v2) {
-            _get_uncompressed_levels(header->data_page_header_v2, _page_data);
+            // For uncompressed page, we may still need to extract v2 levels
+            std::vector<uint8_t> level_bytes;
+            Slice uncompressed_data;
+            RETURN_IF_ERROR(_page_reader->get_page_data(uncompressed_data));
+            if (header->__isset.data_page_header_v2) {
+                const tparquet::DataPageHeaderV2& header_v2 = 
header->data_page_header_v2;
+                size_t rl = header_v2.repetition_levels_byte_length;
+                size_t dl = header_v2.definition_levels_byte_length;
+                size_t level_sz = rl + dl;
+                if (level_sz > 0) {
+                    level_bytes.resize(level_sz);
+                    memcpy(level_bytes.data(), uncompressed_data.data, 
level_sz);
+                }
+                _get_uncompressed_levels(header_v2, uncompressed_data);
+            }
+            // copy page data out
+            _page_data = Slice(uncompressed_data.data, uncompressed_data.size);
+            // Optionally cache uncompressed data for uncompressed pages
+            if (_page_read_ctx.enable_parquet_file_page_cache &&
+                !config::disable_storage_page_cache && 
StoragePageCache::instance() != nullptr) {
+                _insert_page_into_cache(level_bytes, _page_data);
+                _chunk_statistics.page_cache_decompressed_write_counter += 1;
+            }
         }
     }
 
@@ -244,7 +355,6 @@ Status ColumnChunkReader<IN_COLLECTION, 
OFFSET_INDEX>::load_page_data() {
         _decoders[static_cast<int>(encoding)] = std::move(page_decoder);
         _page_decoder = _decoders[static_cast<int>(encoding)].get();
     }
-    // Reset page data for each page
     RETURN_IF_ERROR(_page_decoder->set_data(&_page_data));
 
     _state = DATA_LOADED;
@@ -254,7 +364,7 @@ Status ColumnChunkReader<IN_COLLECTION, 
OFFSET_INDEX>::load_page_data() {
 template <bool IN_COLLECTION, bool OFFSET_INDEX>
 Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::_decode_dict_page() {
     const tparquet::PageHeader* header = nullptr;
-    RETURN_IF_ERROR(_page_reader->get_page_header(header));
+    RETURN_IF_ERROR(_page_reader->get_page_header(&header));
     DCHECK_EQ(tparquet::PageType::DICTIONARY_PAGE, header->type);
     SCOPED_RAW_TIMER(&_chunk_statistics.decode_dict_time);
 
@@ -271,16 +381,84 @@ Status ColumnChunkReader<IN_COLLECTION, 
OFFSET_INDEX>::_decode_dict_page() {
     // Prepare dictionary data
     int32_t uncompressed_size = header->uncompressed_page_size;
     auto dict_data = make_unique_buffer<uint8_t>(uncompressed_size);
-    if (_block_compress_codec != nullptr) {
-        Slice compressed_data;
-        RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data));
-        Slice dict_slice(dict_data.get(), uncompressed_size);
-        RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, 
&dict_slice));
-    } else {
-        Slice dict_slice;
-        RETURN_IF_ERROR(_page_reader->get_page_data(dict_slice));
-        // The data is stored by BufferedStreamReader, we should copy it out
-        memcpy(dict_data.get(), dict_slice.data, dict_slice.size);
+    bool dict_loaded = false;
+
+    // Try to load dictionary page from cache
+    if (_page_read_ctx.enable_parquet_file_page_cache && 
!config::disable_storage_page_cache &&
+        StoragePageCache::instance() != nullptr) {
+        if (_page_reader->has_page_cache_handle()) {
+            const PageCacheHandle& handle = _page_reader->page_cache_handle();
+            Slice cached = handle.data();
+            size_t header_size = _page_reader->header_bytes().size();
+            // Dictionary page layout in cache: header | payload (compressed 
or uncompressed)
+            Slice payload_slice(cached.data + header_size, cached.size - 
header_size);
+
+            bool cache_payload_is_decompressed = 
_page_reader->is_cache_payload_decompressed();
+
+            if (cache_payload_is_decompressed) {
+                // Use cached decompressed dictionary data
+                memcpy(dict_data.get(), payload_slice.data, 
payload_slice.size);
+                dict_loaded = true;
+            } else {
+                CHECK(_block_compress_codec);
+                // Decompress cached compressed dictionary data
+                Slice dict_slice(dict_data.get(), uncompressed_size);
+                
RETURN_IF_ERROR(_block_compress_codec->decompress(payload_slice, &dict_slice));
+                dict_loaded = true;
+            }
+
+            // When dictionary page is loaded from cache, we need to skip the 
page data
+            // to update the offset correctly (similar to calling 
get_page_data())
+            if (dict_loaded) {
+                _page_reader->skip_page_data();
+            }
+        }
+    }
+
+    if (!dict_loaded) {
+        // Load and decompress dictionary page from file
+        if (_block_compress_codec != nullptr) {
+            Slice compressed_data;
+            RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data));
+            Slice dict_slice(dict_data.get(), uncompressed_size);
+            RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, 
&dict_slice));
+
+            // Decide whether to cache decompressed or compressed dictionary 
based on threshold
+            bool cache_payload_decompressed = 
should_cache_decompressed(header, _metadata);
+
+            if (_page_read_ctx.enable_parquet_file_page_cache &&
+                !config::disable_storage_page_cache && 
StoragePageCache::instance() != nullptr &&
+                !_page_reader->header_bytes().empty()) {
+                std::vector<uint8_t> empty_levels; // Dictionary pages don't 
have levels
+                if (cache_payload_decompressed) {
+                    // Cache the decompressed dictionary page
+                    _insert_page_into_cache(empty_levels, dict_slice);
+                    _chunk_statistics.page_cache_decompressed_write_counter += 
1;
+                } else {
+                    if (config::enable_parquet_cache_compressed_pages) {
+                        // Cache the compressed dictionary page
+                        _insert_page_into_cache(empty_levels,
+                                                Slice(compressed_data.data, 
compressed_data.size));
+                        _chunk_statistics.page_cache_compressed_write_counter 
+= 1;
+                    }
+                }
+            }
+        } else {
+            Slice dict_slice;
+            RETURN_IF_ERROR(_page_reader->get_page_data(dict_slice));
+            // The data is stored by BufferedStreamReader, we should copy it 
out
+            memcpy(dict_data.get(), dict_slice.data, dict_slice.size);
+
+            // Cache the uncompressed dictionary page
+            if (_page_read_ctx.enable_parquet_file_page_cache &&
+                !config::disable_storage_page_cache && 
StoragePageCache::instance() != nullptr &&
+                !_page_reader->header_bytes().empty()) {
+                std::vector<uint8_t> empty_levels;
+                Slice payload(dict_data.get(), uncompressed_size);
+                _insert_page_into_cache(empty_levels, payload);
+                _chunk_statistics.page_cache_decompressed_write_counter += 1;
+            }
+        }
     }
 
     // Cache page decoder
@@ -306,6 +484,32 @@ void ColumnChunkReader<IN_COLLECTION, 
OFFSET_INDEX>::_reserve_decompress_buf(siz
     }
 }
 
+template <bool IN_COLLECTION, bool OFFSET_INDEX>
+void ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::_insert_page_into_cache(
+        const std::vector<uint8_t>& level_bytes, const Slice& payload) {
+    StoragePageCache::CacheKey key =
+            
_page_reader->make_page_cache_key(_page_reader->header_start_offset());
+    const std::vector<uint8_t>& header_bytes = _page_reader->header_bytes();
+    size_t total = header_bytes.size() + level_bytes.size() + payload.size;
+    auto page = std::make_unique<DataPage>(total, true, segment_v2::DATA_PAGE);
+    size_t pos = 0;
+    memcpy(page->data() + pos, header_bytes.data(), header_bytes.size());
+    pos += header_bytes.size();
+    if (!level_bytes.empty()) {
+        memcpy(page->data() + pos, level_bytes.data(), level_bytes.size());
+        pos += level_bytes.size();
+    }
+    if (payload.size > 0) {
+        memcpy(page->data() + pos, payload.data, payload.size);
+        pos += payload.size;
+    }
+    page->reset_size(total);
+    PageCacheHandle handle;
+    StoragePageCache::instance()->insert(key, page.get(), &handle, 
segment_v2::DATA_PAGE);
+    page.release();
+    _chunk_statistics.page_cache_write_counter += 1;
+}
+
 template <bool IN_COLLECTION, bool OFFSET_INDEX>
 Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::skip_values(size_t 
num_values,
                                                                    bool 
skip_data) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
index 9e77a3139f6..d0bf7ab2d81 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
@@ -61,20 +61,19 @@ struct ColumnChunkReaderStatistics {
     int64_t skip_page_header_num = 0;
     int64_t parse_page_header_num = 0;
     int64_t read_page_header_time = 0;
+    int64_t page_read_counter = 0;
+    int64_t page_cache_write_counter = 0;
+    int64_t page_cache_compressed_write_counter = 0;
+    int64_t page_cache_decompressed_write_counter = 0;
+    int64_t page_cache_hit_counter = 0;
+    int64_t page_cache_missing_counter = 0;
+    int64_t page_cache_compressed_hit_counter = 0;
+    int64_t page_cache_decompressed_hit_counter = 0;
 };
 
 /**
  * Read and decode parquet column data into doris block column.
- * <p>Usage:</p>    struct ColumnChunkReaderStatistics {
-        int64_t decompress_time = 0;
-        int64_t decompress_cnt = 0;
-        int64_t decode_header_time = 0;
-        int64_t decode_value_time = 0;
-        int64_t decode_dict_time = 0;
-        int64_t decode_level_time = 0;
-        int64_t skip_page_header_num = 0;
-        int64_t parse_page_header_num = 0;
-    };
+ * <p>Usage:</p>
  * // Create chunk reader
  * ColumnChunkReader chunk_reader(BufferedStreamReader* reader,
  *                                tparquet::ColumnChunk* column_chunk,
@@ -97,7 +96,8 @@ class ColumnChunkReader {
 public:
     ColumnChunkReader(io::BufferedStreamReader* reader, tparquet::ColumnChunk* 
column_chunk,
                       FieldSchema* field_schema, const tparquet::OffsetIndex* 
offset_index,
-                      size_t total_row, io::IOContext* io_ctx);
+                      size_t total_row, io::IOContext* io_ctx,
+                      const ParquetPageReadContext& page_read_ctx);
     ~ColumnChunkReader() = default;
 
     // Initialize chunk reader, will generate the decoder and codec.
@@ -155,6 +155,21 @@ public:
                 _page_reader->page_statistics().parse_page_header_num;
         _chunk_statistics.read_page_header_time =
                 _page_reader->page_statistics().read_page_header_time;
+        _chunk_statistics.page_read_counter += 
_page_reader->page_statistics().page_read_counter;
+        _chunk_statistics.page_cache_write_counter +=
+                _page_reader->page_statistics().page_cache_write_counter;
+        _chunk_statistics.page_cache_compressed_write_counter +=
+                
_page_reader->page_statistics().page_cache_compressed_write_counter;
+        _chunk_statistics.page_cache_decompressed_write_counter +=
+                
_page_reader->page_statistics().page_cache_decompressed_write_counter;
+        _chunk_statistics.page_cache_hit_counter +=
+                _page_reader->page_statistics().page_cache_hit_counter;
+        _chunk_statistics.page_cache_missing_counter +=
+                _page_reader->page_statistics().page_cache_missing_counter;
+        _chunk_statistics.page_cache_compressed_hit_counter +=
+                
_page_reader->page_statistics().page_cache_compressed_hit_counter;
+        _chunk_statistics.page_cache_decompressed_hit_counter +=
+                
_page_reader->page_statistics().page_cache_decompressed_hit_counter;
         return _chunk_statistics;
     }
 
@@ -193,6 +208,11 @@ public:
                                  size_t* result_rows, bool* cross_page);
     Status load_cross_page_nested_row(std::vector<level_t>& rep_levels, bool* 
cross_page);
 
+    Slice get_page_data() const { return _page_data; }
+    const Slice& v2_rep_levels() const { return _v2_rep_levels; }
+    const Slice& v2_def_levels() const { return _v2_def_levels; }
+    ColumnChunkReaderStatistics& statistics() { return chunk_statistics(); }
+
 private:
     enum ColumnChunkReaderState { NOT_INIT, INITIALIZED, HEADER_PARSED, 
DATA_LOADED, PAGE_SKIPPED };
 
@@ -202,6 +222,7 @@ private:
 
     void _reserve_decompress_buf(size_t size);
     int32_t _get_type_length();
+    void _insert_page_into_cache(const std::vector<uint8_t>& level_bytes, 
const Slice& payload);
 
     void _get_uncompressed_levels(const tparquet::DataPageHeaderV2& page_v2, 
Slice& page_data);
     Status _skip_nested_rows_in_page(size_t num_rows);
@@ -234,6 +255,8 @@ private:
     std::unique_ptr<PageReader<IN_COLLECTION, OFFSET_INDEX>> _page_reader;
     BlockCompressionCodec* _block_compress_codec = nullptr;
 
+    ParquetPageReadContext _page_read_ctx;
+
     LevelDecoder _rep_level_decoder;
     LevelDecoder _def_level_decoder;
     size_t _chunk_parsed_values = 0;
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index 30962632662..656e599a962 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -110,13 +110,14 @@ Status ParquetColumnReader::create(io::FileReaderSPtr 
file, FieldSchema* field,
                                    std::unique_ptr<ParquetColumnReader>& 
reader,
                                    size_t max_buf_size,
                                    std::unordered_map<int, 
tparquet::OffsetIndex>& col_offsets,
-                                   bool in_collection, const 
std::set<uint64_t>& column_ids,
+                                   RuntimeState* state, bool in_collection,
+                                   const std::set<uint64_t>& column_ids,
                                    const std::set<uint64_t>& 
filter_column_ids) {
     size_t total_rows = row_group.num_rows;
     if (field->data_type->get_primitive_type() == TYPE_ARRAY) {
         std::unique_ptr<ParquetColumnReader> element_reader;
         RETURN_IF_ERROR(create(file, &field->children[0], row_group, 
row_ranges, ctz, io_ctx,
-                               element_reader, max_buf_size, col_offsets, 
true, column_ids,
+                               element_reader, max_buf_size, col_offsets, 
state, true, column_ids,
                                filter_column_ids));
         auto array_reader = ArrayColumnReader::create_unique(row_ranges, 
total_rows, ctz, io_ctx);
         element_reader->set_column_in_nested();
@@ -130,7 +131,7 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, 
FieldSchema* field,
             column_ids.find(field->children[0].get_column_id()) != 
column_ids.end()) {
             // Create key reader
             RETURN_IF_ERROR(create(file, &field->children[0], row_group, 
row_ranges, ctz, io_ctx,
-                                   key_reader, max_buf_size, col_offsets, 
true, column_ids,
+                                   key_reader, max_buf_size, col_offsets, 
state, true, column_ids,
                                    filter_column_ids));
         } else {
             auto skip_reader = std::make_unique<SkipReadingReader>(row_ranges, 
total_rows, ctz,
@@ -142,7 +143,7 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, 
FieldSchema* field,
             column_ids.find(field->children[1].get_column_id()) != 
column_ids.end()) {
             // Create value reader
             RETURN_IF_ERROR(create(file, &field->children[1], row_group, 
row_ranges, ctz, io_ctx,
-                                   value_reader, max_buf_size, col_offsets, 
true, column_ids,
+                                   value_reader, max_buf_size, col_offsets, 
state, true, column_ids,
                                    filter_column_ids));
         } else {
             auto skip_reader = std::make_unique<SkipReadingReader>(row_ranges, 
total_rows, ctz,
@@ -165,8 +166,8 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, 
FieldSchema* field,
             std::unique_ptr<ParquetColumnReader> child_reader;
             if (column_ids.empty() || column_ids.find(child.get_column_id()) 
!= column_ids.end()) {
                 RETURN_IF_ERROR(create(file, &child, row_group, row_ranges, 
ctz, io_ctx,
-                                       child_reader, max_buf_size, 
col_offsets, in_collection,
-                                       column_ids, filter_column_ids));
+                                       child_reader, max_buf_size, 
col_offsets, state,
+                                       in_collection, column_ids, 
filter_column_ids));
                 child_readers[child.name] = std::move(child_reader);
                 // Record the first non-SkippingReader
                 if (non_skip_reader_idx == -1) {
@@ -184,7 +185,7 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, 
FieldSchema* field,
         if (non_skip_reader_idx == -1) {
             std::unique_ptr<ParquetColumnReader> child_reader;
             RETURN_IF_ERROR(create(file, &field->children[0], row_group, 
row_ranges, ctz, io_ctx,
-                                   child_reader, max_buf_size, col_offsets, 
in_collection,
+                                   child_reader, max_buf_size, col_offsets, 
state, in_collection,
                                    column_ids, filter_column_ids));
             child_reader->set_column_in_nested();
             child_readers[field->children[0].name] = std::move(child_reader);
@@ -205,14 +206,14 @@ Status ParquetColumnReader::create(io::FileReaderSPtr 
file, FieldSchema* field,
                 auto scalar_reader = ScalarColumnReader<true, 
false>::create_unique(
                         row_ranges, total_rows, chunk, offset_index, ctz, 
io_ctx);
 
-                RETURN_IF_ERROR(scalar_reader->init(file, field, 
max_buf_size));
+                RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size, 
state));
                 scalar_reader->_filter_column_ids = filter_column_ids;
                 reader.reset(scalar_reader.release());
             } else {
                 auto scalar_reader = ScalarColumnReader<true, 
true>::create_unique(
                         row_ranges, total_rows, chunk, offset_index, ctz, 
io_ctx);
 
-                RETURN_IF_ERROR(scalar_reader->init(file, field, 
max_buf_size));
+                RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size, 
state));
                 scalar_reader->_filter_column_ids = filter_column_ids;
                 reader.reset(scalar_reader.release());
             }
@@ -221,14 +222,14 @@ Status ParquetColumnReader::create(io::FileReaderSPtr 
file, FieldSchema* field,
                 auto scalar_reader = ScalarColumnReader<false, 
false>::create_unique(
                         row_ranges, total_rows, chunk, offset_index, ctz, 
io_ctx);
 
-                RETURN_IF_ERROR(scalar_reader->init(file, field, 
max_buf_size));
+                RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size, 
state));
                 scalar_reader->_filter_column_ids = filter_column_ids;
                 reader.reset(scalar_reader.release());
             } else {
                 auto scalar_reader = ScalarColumnReader<false, 
true>::create_unique(
                         row_ranges, total_rows, chunk, offset_index, ctz, 
io_ctx);
 
-                RETURN_IF_ERROR(scalar_reader->init(file, field, 
max_buf_size));
+                RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size, 
state));
                 scalar_reader->_filter_column_ids = filter_column_ids;
                 reader.reset(scalar_reader.release());
             }
@@ -246,7 +247,8 @@ void ParquetColumnReader::_generate_read_ranges(RowRange 
page_row_range,
 template <bool IN_COLLECTION, bool OFFSET_INDEX>
 Status ScalarColumnReader<IN_COLLECTION, 
OFFSET_INDEX>::init(io::FileReaderSPtr file,
                                                              FieldSchema* 
field,
-                                                             size_t 
max_buf_size) {
+                                                             size_t 
max_buf_size,
+                                                             RuntimeState* 
state) {
     _field_schema = field;
     auto& chunk_meta = _chunk_meta.meta_data;
     int64_t chunk_start = has_dict_page(chunk_meta) ? 
chunk_meta.dictionary_page_offset
@@ -262,8 +264,11 @@ Status ScalarColumnReader<IN_COLLECTION, 
OFFSET_INDEX>::init(io::FileReaderSPtr
     }
     _stream_reader = std::make_unique<io::BufferedFileStreamReader>(file, 
chunk_start, chunk_len,
                                                                     
prefetch_buffer_size);
+    ParquetPageReadContext ctx(
+            (state == nullptr) ? true : 
state->query_options().enable_parquet_file_page_cache);
+
     _chunk_reader = std::make_unique<ColumnChunkReader<IN_COLLECTION, 
OFFSET_INDEX>>(
-            _stream_reader.get(), &_chunk_meta, field, _offset_index, 
_total_rows, _io_ctx);
+            _stream_reader.get(), &_chunk_meta, field, _offset_index, 
_total_rows, _io_ctx, ctx);
     RETURN_IF_ERROR(_chunk_reader->init());
     return Status::OK();
 }
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index 2a21ddd84cb..97ad9d1fd3a 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -65,7 +65,15 @@ public:
                   decode_null_map_time(0),
                   skip_page_header_num(0),
                   parse_page_header_num(0),
-                  read_page_header_time(0) {}
+                  read_page_header_time(0),
+                  page_read_counter(0),
+                  page_cache_write_counter(0),
+                  page_cache_compressed_write_counter(0),
+                  page_cache_decompressed_write_counter(0),
+                  page_cache_hit_counter(0),
+                  page_cache_missing_counter(0),
+                  page_cache_compressed_hit_counter(0),
+                  page_cache_decompressed_hit_counter(0) {}
 
         ColumnStatistics(ColumnChunkReaderStatistics& cs, int64_t 
null_map_time)
                 : page_index_read_calls(0),
@@ -78,7 +86,15 @@ public:
                   decode_null_map_time(null_map_time),
                   skip_page_header_num(cs.skip_page_header_num),
                   parse_page_header_num(cs.parse_page_header_num),
-                  read_page_header_time(cs.read_page_header_time) {}
+                  read_page_header_time(cs.read_page_header_time),
+                  page_read_counter(cs.page_read_counter),
+                  page_cache_write_counter(cs.page_cache_write_counter),
+                  
page_cache_compressed_write_counter(cs.page_cache_compressed_write_counter),
+                  
page_cache_decompressed_write_counter(cs.page_cache_decompressed_write_counter),
+                  page_cache_hit_counter(cs.page_cache_hit_counter),
+                  page_cache_missing_counter(cs.page_cache_missing_counter),
+                  
page_cache_compressed_hit_counter(cs.page_cache_compressed_hit_counter),
+                  
page_cache_decompressed_hit_counter(cs.page_cache_decompressed_hit_counter) {}
 
         int64_t page_index_read_calls;
         int64_t decompress_time;
@@ -91,6 +107,14 @@ public:
         int64_t skip_page_header_num;
         int64_t parse_page_header_num;
         int64_t read_page_header_time;
+        int64_t page_read_counter;
+        int64_t page_cache_write_counter;
+        int64_t page_cache_compressed_write_counter;
+        int64_t page_cache_decompressed_write_counter;
+        int64_t page_cache_hit_counter;
+        int64_t page_cache_missing_counter;
+        int64_t page_cache_compressed_hit_counter;
+        int64_t page_cache_decompressed_hit_counter;
 
         void merge(ColumnStatistics& col_statistics) {
             page_index_read_calls += col_statistics.page_index_read_calls;
@@ -104,6 +128,17 @@ public:
             skip_page_header_num += col_statistics.skip_page_header_num;
             parse_page_header_num += col_statistics.parse_page_header_num;
             read_page_header_time += col_statistics.read_page_header_time;
+            page_read_counter += col_statistics.page_read_counter;
+            page_cache_write_counter += 
col_statistics.page_cache_write_counter;
+            page_cache_compressed_write_counter +=
+                    col_statistics.page_cache_compressed_write_counter;
+            page_cache_decompressed_write_counter +=
+                    col_statistics.page_cache_decompressed_write_counter;
+            page_cache_hit_counter += col_statistics.page_cache_hit_counter;
+            page_cache_missing_counter += 
col_statistics.page_cache_missing_counter;
+            page_cache_compressed_hit_counter += 
col_statistics.page_cache_compressed_hit_counter;
+            page_cache_decompressed_hit_counter +=
+                    col_statistics.page_cache_decompressed_hit_counter;
         }
     };
 
@@ -132,7 +167,8 @@ public:
                          cctz::time_zone* ctz, io::IOContext* io_ctx,
                          std::unique_ptr<ParquetColumnReader>& reader, size_t 
max_buf_size,
                          std::unordered_map<int, tparquet::OffsetIndex>& 
col_offsets,
-                         bool in_collection = false, const std::set<uint64_t>& 
column_ids = {},
+                         RuntimeState* state, bool in_collection = false,
+                         const std::set<uint64_t>& column_ids = {},
                          const std::set<uint64_t>& filter_column_ids = {});
     virtual const std::vector<level_t>& get_rep_level() const = 0;
     virtual const std::vector<level_t>& get_def_level() const = 0;
@@ -175,7 +211,8 @@ public:
               _chunk_meta(chunk_meta),
               _offset_index(offset_index) {}
     ~ScalarColumnReader() override { close(); }
-    Status init(io::FileReaderSPtr file, FieldSchema* field, size_t 
max_buf_size);
+    Status init(io::FileReaderSPtr file, FieldSchema* field, size_t 
max_buf_size,
+                RuntimeState* state);
     Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
                             const 
std::shared_ptr<TableSchemaChangeHelper::Node>& root_node,
                             FilterMap& filter_map, size_t batch_size, size_t* 
read_rows, bool* eof,
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 543c8c44bd4..12ac55016ca 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -133,7 +133,7 @@ Status RowGroupReader::init(
         std::unique_ptr<ParquetColumnReader> reader;
         RETURN_IF_ERROR(ParquetColumnReader::create(
                 _file_reader, field, _row_group_meta, _read_ranges, _ctz, 
_io_ctx, reader,
-                max_buf_size, col_offsets, false, _column_ids, 
_filter_column_ids));
+                max_buf_size, col_offsets, _state, false, _column_ids, 
_filter_column_ids));
         if (reader == nullptr) {
             VLOG_DEBUG << "Init row group(" << _row_group_id << ") reader 
failed";
             return Status::Corruption("Init row group reader failed");
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
index 3b6d7fdcb9b..70fdccd04a6 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
@@ -17,6 +17,7 @@
 
 #include "vparquet_page_reader.h"
 
+#include <fmt/format.h>
 #include <gen_cpp/parquet_types.h>
 #include <stddef.h>
 #include <stdint.h>
@@ -26,6 +27,8 @@
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/config.h"
 #include "io/fs/buffered_reader.h"
+#include "olap/page_cache.h"
+#include "parquet_common.h"
 #include "util/runtime_profile.h"
 #include "util/slice.h"
 #include "util/thrift_util.h"
@@ -40,10 +43,16 @@ namespace doris::vectorized {
 #include "common/compile_check_begin.h"
 static constexpr size_t INIT_PAGE_HEADER_SIZE = 128;
 
+void ParquetPageCacheKeyBuilder::init(const std::string& path, int64_t mtime) {
+    _file_key_prefix = fmt::format("{}::{}", path, mtime);
+}
+
 template <bool IN_COLLECTION, bool OFFSET_INDEX>
 PageReader<IN_COLLECTION, OFFSET_INDEX>::PageReader(io::BufferedStreamReader* 
reader,
                                                     io::IOContext* io_ctx, 
uint64_t offset,
                                                     uint64_t length, size_t 
total_rows,
+                                                    const 
tparquet::ColumnMetaData& metadata,
+                                                    const 
ParquetPageReadContext& page_read_ctx,
                                                     const 
tparquet::OffsetIndex* offset_index)
         : _reader(reader),
           _io_ctx(io_ctx),
@@ -51,9 +60,12 @@ PageReader<IN_COLLECTION, 
OFFSET_INDEX>::PageReader(io::BufferedStreamReader* re
           _start_offset(offset),
           _end_offset(offset + length),
           _total_rows(total_rows),
+          _metadata(metadata),
+          _page_read_ctx(page_read_ctx),
           _offset_index(offset_index) {
     _next_header_offset = _offset;
     _state = INITIALIZED;
+    _page_cache_key_builder.init(_reader->path(), _reader->mtime());
 
     if constexpr (OFFSET_INDEX) {
         _end_row = _offset_index->page_locations.size() >= 2
@@ -77,11 +89,72 @@ Status PageReader<IN_COLLECTION, 
OFFSET_INDEX>::parse_page_header() {
         return Status::IOError("Should skip or load current page to get next 
page");
     }
 
+    _page_statistics.page_read_counter += 1;
+
+    // Parse page header from file; header bytes are saved for possible cache 
insertion
     const uint8_t* page_header_buf = nullptr;
     size_t max_size = _end_offset - _offset;
     size_t header_size = std::min(INIT_PAGE_HEADER_SIZE, max_size);
     const size_t MAX_PAGE_HEADER_SIZE = config::parquet_header_max_size_mb << 
20;
     uint32_t real_header_size = 0;
+
+    // Try a header-only lookup in the page cache. Cached pages store
+    // header + optional v2 levels + uncompressed payload, so we can
+    // parse the page header directly from the cached bytes and avoid
+    // a file read for the header.
+    if (_page_read_ctx.enable_parquet_file_page_cache && 
!config::disable_storage_page_cache &&
+        StoragePageCache::instance() != nullptr) {
+        PageCacheHandle handle;
+        StoragePageCache::CacheKey key = 
make_page_cache_key(static_cast<int64_t>(_offset));
+        if (StoragePageCache::instance()->lookup(key, &handle, 
segment_v2::DATA_PAGE)) {
+            // Parse header directly from cached data
+            _page_cache_handle = std::move(handle);
+            Slice s = _page_cache_handle.data();
+            real_header_size = cast_set<uint32_t>(s.size);
+            SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
+            auto st = deserialize_thrift_msg(reinterpret_cast<const 
uint8_t*>(s.data),
+                                             &real_header_size, true, 
&_cur_page_header);
+            if (!st.ok()) return st;
+            // Increment page cache counters for a true cache hit on 
header+payload
+            _page_statistics.page_cache_hit_counter += 1;
+            // Detect whether the cached payload is compressed or decompressed 
and record
+            bool is_cache_payload_decompressed =
+                    should_cache_decompressed(&_cur_page_header, _metadata);
+
+            if (is_cache_payload_decompressed) {
+                _page_statistics.page_cache_decompressed_hit_counter += 1;
+            } else {
+                _page_statistics.page_cache_compressed_hit_counter += 1;
+            }
+
+            _is_cache_payload_decompressed = is_cache_payload_decompressed;
+
+            if constexpr (OFFSET_INDEX == false) {
+                if (is_header_v2()) {
+                    _end_row = _start_row + 
_cur_page_header.data_page_header_v2.num_rows;
+                } else if constexpr (!IN_COLLECTION) {
+                    _end_row = _start_row + 
_cur_page_header.data_page_header.num_values;
+                }
+            }
+
+            // Save header bytes for later use (e.g., to insert updated cache 
entries)
+            _header_buf.assign(s.data, s.data + real_header_size);
+            _last_header_size = real_header_size;
+            _page_statistics.parse_page_header_num++;
+            _offset += real_header_size;
+            _next_header_offset = _offset + 
_cur_page_header.compressed_page_size;
+            _state = HEADER_PARSED;
+            return Status::OK();
+        } else {
+            _page_statistics.page_cache_missing_counter += 1;
+            // Clear any existing cache handle on miss to avoid holding stale 
handle
+            _page_cache_handle = PageCacheHandle();
+        }
+    }
+    // NOTE: page cache lookup for *decompressed* page data is handled in
+    // ColumnChunkReader::load_page_data(). PageReader should only be
+    // responsible for parsing the header bytes from the file and saving
+    // them in `_header_buf` for possible later insertion into the cache.
     while (true) {
         if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
             return Status::EndOfFile("stop");
@@ -115,6 +188,9 @@ Status PageReader<IN_COLLECTION, 
OFFSET_INDEX>::parse_page_header() {
         }
     }
 
+    // Save header bytes for possible cache insertion later
+    _header_buf.assign(page_header_buf, page_header_buf + real_header_size);
+    _last_header_size = real_header_size;
     _page_statistics.parse_page_header_num++;
     _offset += real_header_size;
     _next_header_offset = _offset + _cur_page_header.compressed_page_size;
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_page_reader.h
index 9246819d59c..9aa5ba3171c 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.h
@@ -21,9 +21,12 @@
 #include <stdint.h>
 
 #include <memory>
+#include <string>
 
 #include "common/cast_set.h"
+#include "common/config.h"
 #include "common/status.h"
+#include "olap/page_cache.h"
 #include "util/block_compression.h"
 #include "vec/exec/format/parquet/parquet_common.h"
 namespace doris {
@@ -50,6 +53,35 @@ namespace doris::vectorized {
  * Use to deserialize parquet page header, and get the page data in iterator 
interface.
  */
 
+// Session-level options for parquet page reading/caching.
+struct ParquetPageReadContext {
+    bool enable_parquet_file_page_cache = true;
+    ParquetPageReadContext() = default;
+    ParquetPageReadContext(bool enable_parquet_file_page_cache)
+            : enable_parquet_file_page_cache(enable_parquet_file_page_cache) {}
+};
+
+inline bool should_cache_decompressed(const tparquet::PageHeader* header,
+                                      const tparquet::ColumnMetaData& 
metadata) {
+    if (header->compressed_page_size <= 0) return true;
+    if (metadata.codec == tparquet::CompressionCodec::UNCOMPRESSED) return 
true;
+
+    double ratio = static_cast<double>(header->uncompressed_page_size) /
+                   static_cast<double>(header->compressed_page_size);
+    return ratio <= config::parquet_page_cache_decompress_threshold;
+}
+
+class ParquetPageCacheKeyBuilder {
+public:
+    void init(const std::string& path, int64_t mtime);
+    StoragePageCache::CacheKey make_key(uint64_t end_offset, int64_t offset) 
const {
+        return StoragePageCache::CacheKey(_file_key_prefix, end_offset, 
offset);
+    }
+
+private:
+    std::string _file_key_prefix;
+};
+
 template <bool IN_COLLECTION, bool OFFSET_INDEX>
 class PageReader {
 public:
@@ -58,10 +90,19 @@ public:
         int64_t skip_page_header_num = 0;
         int64_t parse_page_header_num = 0;
         int64_t read_page_header_time = 0;
+        int64_t page_cache_hit_counter = 0;
+        int64_t page_cache_missing_counter = 0;
+        int64_t page_cache_compressed_hit_counter = 0;
+        int64_t page_cache_decompressed_hit_counter = 0;
+        int64_t page_cache_write_counter = 0;
+        int64_t page_cache_compressed_write_counter = 0;
+        int64_t page_cache_decompressed_write_counter = 0;
+        int64_t page_read_counter = 0;
     };
 
     PageReader(io::BufferedStreamReader* reader, io::IOContext* io_ctx, 
uint64_t offset,
-               uint64_t length, size_t total_rows,
+               uint64_t length, size_t total_rows, const 
tparquet::ColumnMetaData& metadata,
+               const ParquetPageReadContext& page_read_ctx,
                const tparquet::OffsetIndex* offset_index = nullptr);
     ~PageReader() = default;
 
@@ -123,24 +164,53 @@ public:
         }
     }
 
-    Status get_page_header(const tparquet::PageHeader*& page_header) {
+    Status get_page_header(const tparquet::PageHeader** page_header) {
         if (UNLIKELY(_state != HEADER_PARSED)) {
             return Status::InternalError("Page header not parsed");
         }
-        page_header = &_cur_page_header;
+        *page_header = &_cur_page_header;
         return Status::OK();
     }
 
     Status get_page_data(Slice& slice);
 
+    // Skip page data and update offset (used when data is loaded from cache)
+    void skip_page_data() {
+        if (_state == HEADER_PARSED) {
+            _offset += _cur_page_header.compressed_page_size;
+            _state = DATA_LOADED;
+        }
+    }
+
+    const std::vector<uint8_t>& header_bytes() const { return _header_buf; }
+    // header start offset for current page
+    int64_t header_start_offset() const {
+        return static_cast<int64_t>(_next_header_offset) - 
static_cast<int64_t>(_last_header_size) -
+               static_cast<int64_t>(_cur_page_header.compressed_page_size);
+    }
+    uint64_t file_end_offset() const { return _end_offset; }
+    bool cached_decompressed() const {
+        return should_cache_decompressed(&_cur_page_header, _metadata);
+    }
+
     PageStatistics& page_statistics() { return _page_statistics; }
 
     bool is_header_v2() { return _cur_page_header.__isset.data_page_header_v2; 
}
 
+    // Returns whether the current page's cache payload is decompressed
+    bool is_cache_payload_decompressed() const { return 
_is_cache_payload_decompressed; }
+
     size_t start_row() const { return _start_row; }
 
     size_t end_row() const { return _end_row; }
 
+    // Accessors for cache handle
+    bool has_page_cache_handle() const { return _page_cache_handle.cache() != 
nullptr; }
+    const doris::PageCacheHandle& page_cache_handle() const { return 
_page_cache_handle; }
+    StoragePageCache::CacheKey make_page_cache_key(int64_t offset) const {
+        return _page_cache_key_builder.make_key(_end_offset, offset);
+    }
+
 private:
     enum PageReaderState { INITIALIZED, HEADER_PARSED, DATA_LOADED };
     PageReaderState _state = INITIALIZED;
@@ -159,19 +229,33 @@ private:
     size_t _end_row = 0;
     // total rows in this column chunk
     size_t _total_rows = 0;
+    // Column metadata for this column chunk
+    const tparquet::ColumnMetaData& _metadata;
+    // Session-level parquet page cache options
+    ParquetPageReadContext _page_read_ctx;
     // for page index
     size_t _page_index = 0;
     const tparquet::OffsetIndex* _offset_index;
 
     tparquet::PageHeader _cur_page_header;
+    bool _is_cache_payload_decompressed = true;
+
+    // Page cache members
+    ParquetPageCacheKeyBuilder _page_cache_key_builder;
+    doris::PageCacheHandle _page_cache_handle;
+    // stored header bytes when cache miss so we can insert header+payload 
into cache
+    std::vector<uint8_t> _header_buf;
+    // last parsed header size in bytes
+    uint32_t _last_header_size = 0;
 };
 
 template <bool IN_COLLECTION, bool OFFSET_INDEX>
 std::unique_ptr<PageReader<IN_COLLECTION, OFFSET_INDEX>> create_page_reader(
         io::BufferedStreamReader* reader, io::IOContext* io_ctx, uint64_t 
offset, uint64_t length,
-        size_t total_rows, const tparquet::OffsetIndex* offset_index = 
nullptr) {
-    return std::make_unique<PageReader<IN_COLLECTION, OFFSET_INDEX>>(reader, 
io_ctx, offset, length,
-                                                                     
total_rows, offset_index);
+        size_t total_rows, const tparquet::ColumnMetaData& metadata,
+        const ParquetPageReadContext& ctx, const tparquet::OffsetIndex* 
offset_index = nullptr) {
+    return std::make_unique<PageReader<IN_COLLECTION, OFFSET_INDEX>>(
+            reader, io_ctx, offset, length, total_rows, metadata, ctx, 
offset_index);
 }
 #include "common/compile_check_end.h"
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index a84ee2b9d2c..de99ad2170c 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -194,6 +194,22 @@ void ParquetReader::_init_profile() {
                 ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecompressTime", 
parquet_profile, 1);
         _parquet_profile.decompress_cnt = ADD_CHILD_COUNTER_WITH_LEVEL(
                 _profile, "DecompressCount", TUnit::UNIT, parquet_profile, 1);
+        _parquet_profile.page_read_counter = ADD_CHILD_COUNTER_WITH_LEVEL(
+                _profile, "PageReadCount", TUnit::UNIT, parquet_profile, 1);
+        _parquet_profile.page_cache_write_counter = 
ADD_CHILD_COUNTER_WITH_LEVEL(
+                _profile, "PageCacheWriteCount", TUnit::UNIT, parquet_profile, 
1);
+        _parquet_profile.page_cache_compressed_write_counter = 
ADD_CHILD_COUNTER_WITH_LEVEL(
+                _profile, "PageCacheCompressedWriteCount", TUnit::UNIT, 
parquet_profile, 1);
+        _parquet_profile.page_cache_decompressed_write_counter = 
ADD_CHILD_COUNTER_WITH_LEVEL(
+                _profile, "PageCacheDecompressedWriteCount", TUnit::UNIT, 
parquet_profile, 1);
+        _parquet_profile.page_cache_hit_counter = ADD_CHILD_COUNTER_WITH_LEVEL(
+                _profile, "PageCacheHitCount", TUnit::UNIT, parquet_profile, 
1);
+        _parquet_profile.page_cache_missing_counter = 
ADD_CHILD_COUNTER_WITH_LEVEL(
+                _profile, "PageCacheMissingCount", TUnit::UNIT, 
parquet_profile, 1);
+        _parquet_profile.page_cache_compressed_hit_counter = 
ADD_CHILD_COUNTER_WITH_LEVEL(
+                _profile, "PageCacheCompressedHitCount", TUnit::UNIT, 
parquet_profile, 1);
+        _parquet_profile.page_cache_decompressed_hit_counter = 
ADD_CHILD_COUNTER_WITH_LEVEL(
+                _profile, "PageCacheDecompressedHitCount", TUnit::UNIT, 
parquet_profile, 1);
         _parquet_profile.decode_header_time =
                 ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PageHeaderDecodeTime", 
parquet_profile, 1);
         _parquet_profile.read_page_header_time =
@@ -1271,6 +1287,21 @@ void ParquetReader::_collect_profile() {
                    _column_statistics.page_index_read_calls);
     COUNTER_UPDATE(_parquet_profile.decompress_time, 
_column_statistics.decompress_time);
     COUNTER_UPDATE(_parquet_profile.decompress_cnt, 
_column_statistics.decompress_cnt);
+    COUNTER_UPDATE(_parquet_profile.page_read_counter, 
_column_statistics.page_read_counter);
+    COUNTER_UPDATE(_parquet_profile.page_cache_write_counter,
+                   _column_statistics.page_cache_write_counter);
+    COUNTER_UPDATE(_parquet_profile.page_cache_compressed_write_counter,
+                   _column_statistics.page_cache_compressed_write_counter);
+    COUNTER_UPDATE(_parquet_profile.page_cache_decompressed_write_counter,
+                   _column_statistics.page_cache_decompressed_write_counter);
+    COUNTER_UPDATE(_parquet_profile.page_cache_hit_counter,
+                   _column_statistics.page_cache_hit_counter);
+    COUNTER_UPDATE(_parquet_profile.page_cache_missing_counter,
+                   _column_statistics.page_cache_missing_counter);
+    COUNTER_UPDATE(_parquet_profile.page_cache_compressed_hit_counter,
+                   _column_statistics.page_cache_compressed_hit_counter);
+    COUNTER_UPDATE(_parquet_profile.page_cache_decompressed_hit_counter,
+                   _column_statistics.page_cache_decompressed_hit_counter);
     COUNTER_UPDATE(_parquet_profile.decode_header_time, 
_column_statistics.decode_header_time);
     COUNTER_UPDATE(_parquet_profile.read_page_header_time,
                    _column_statistics.read_page_header_time);
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index b6451ad15d4..dcd9e99f393 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -195,6 +195,14 @@ private:
         RuntimeProfile::Counter* file_footer_hit_cache = nullptr;
         RuntimeProfile::Counter* decompress_time = nullptr;
         RuntimeProfile::Counter* decompress_cnt = nullptr;
+        RuntimeProfile::Counter* page_read_counter = nullptr;
+        RuntimeProfile::Counter* page_cache_write_counter = nullptr;
+        RuntimeProfile::Counter* page_cache_compressed_write_counter = nullptr;
+        RuntimeProfile::Counter* page_cache_decompressed_write_counter = 
nullptr;
+        RuntimeProfile::Counter* page_cache_hit_counter = nullptr;
+        RuntimeProfile::Counter* page_cache_missing_counter = nullptr;
+        RuntimeProfile::Counter* page_cache_compressed_hit_counter = nullptr;
+        RuntimeProfile::Counter* page_cache_decompressed_hit_counter = nullptr;
         RuntimeProfile::Counter* decode_header_time = nullptr;
         RuntimeProfile::Counter* read_page_header_time = nullptr;
         RuntimeProfile::Counter* decode_value_time = nullptr;
diff --git a/be/test/io/fs/buffered_reader_test.cpp 
b/be/test/io/fs/buffered_reader_test.cpp
index 3874b06c68c..bc92d22b178 100644
--- a/be/test/io/fs/buffered_reader_test.cpp
+++ b/be/test/io/fs/buffered_reader_test.cpp
@@ -68,6 +68,8 @@ public:
 
     bool closed() const override { return _reader->closed(); }
 
+    int64_t mtime() const override { return _reader->mtime(); }
+
 private:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const io::IOContext* io_ctx) override {
@@ -96,6 +98,8 @@ public:
 
     bool closed() const override { return _closed; }
 
+    int64_t mtime() const override { return 0; }
+
 protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const io::IOContext* io_ctx) override {
@@ -130,6 +134,8 @@ public:
 
     bool closed() const override { return _delegate->closed(); }
 
+    int64_t mtime() const override { return _delegate->mtime(); }
+
     const io::PrefetchRange& last_read_range() const { return 
*_last_read_range; }
 
 protected:
diff --git a/be/test/io/fs/packed_file_concurrency_test.cpp 
b/be/test/io/fs/packed_file_concurrency_test.cpp
index 31c2db19fb8..1e581ff59b8 100644
--- a/be/test/io/fs/packed_file_concurrency_test.cpp
+++ b/be/test/io/fs/packed_file_concurrency_test.cpp
@@ -411,6 +411,8 @@ public:
 
     bool closed() const override { return _closed; }
 
+    int64_t mtime() const override { return 0; }
+
 protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* /*io_ctx*/) override {
diff --git a/be/test/io/fs/packed_file_reader_test.cpp 
b/be/test/io/fs/packed_file_reader_test.cpp
index feaf5c37f3d..2b0472c38a5 100644
--- a/be/test/io/fs/packed_file_reader_test.cpp
+++ b/be/test/io/fs/packed_file_reader_test.cpp
@@ -69,6 +69,8 @@ protected:
         return Status::OK();
     }
 
+    int64_t mtime() const override { return 0; }
+
 private:
     Path _path = Path("mock_file");
     std::string _content;
diff --git a/be/test/io/fs/packed_file_system_test.cpp 
b/be/test/io/fs/packed_file_system_test.cpp
index 7f8a4af63df..096bb8c4a93 100644
--- a/be/test/io/fs/packed_file_system_test.cpp
+++ b/be/test/io/fs/packed_file_system_test.cpp
@@ -53,6 +53,8 @@ public:
 
     bool closed() const override { return _closed; }
 
+    int64_t mtime() const override { return 0; }
+
 protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* /*io_ctx*/) override {
diff --git a/be/test/vec/exec/format/file_reader/file_meta_cache_test.cpp 
b/be/test/vec/exec/format/file_reader/file_meta_cache_test.cpp
index 3aef8db8459..5ea6335d2b9 100644
--- a/be/test/vec/exec/format/file_reader/file_meta_cache_test.cpp
+++ b/be/test/vec/exec/format/file_reader/file_meta_cache_test.cpp
@@ -38,6 +38,8 @@ public:
 
     bool closed() const override { return _closed; }
 
+    int64_t mtime() const override { return 0; }
+
     Status close() override {
         _closed = true;
         return Status::OK();
diff --git a/be/test/vec/exec/format/parquet/parquet_page_cache_test.cpp 
b/be/test/vec/exec/format/parquet/parquet_page_cache_test.cpp
new file mode 100644
index 00000000000..c415d264e8e
--- /dev/null
+++ b/be/test/vec/exec/format/parquet/parquet_page_cache_test.cpp
@@ -0,0 +1,804 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <fmt/format.h>
+#include <gtest/gtest.h>
+
+#include "common/config.h"
+#include "io/fs/buffered_reader.h"
+#include "olap/page_cache.h"
+#include "runtime/exec_env.h"
+#include "runtime/memory/cache_manager.h"
+#include "util/block_compression.h"
+#include "util/faststring.h"
+#include "util/thrift_util.h"
+#include "vec/exec/format/parquet/schema_desc.h"
+#include "vec/exec/format/parquet/vparquet_column_chunk_reader.h"
+#include "vec/exec/format/parquet/vparquet_page_reader.h"
+
+using namespace doris;
+using namespace doris::vectorized;
+
+class FakeBufferedReader : public io::BufferedStreamReader {
+public:
+    FakeBufferedReader(std::string path, std::vector<uint8_t> data)
+            : _path(std::move(path)), _data(std::move(data)) {}
+    Status read_bytes(const uint8_t** buf, uint64_t offset, const size_t 
bytes_to_read,
+                      const doris::io::IOContext* io_ctx) override {
+        if (offset + bytes_to_read > _data.size()) return Status::IOError("Out 
of bounds");
+        *buf = _data.data() + offset;
+        return Status::OK();
+    }
+    Status read_bytes(Slice& slice, uint64_t offset, const 
doris::io::IOContext* io_ctx) override {
+        if (offset + slice.size > _data.size()) return Status::IOError("Out of 
bounds");
+        slice.data = reinterpret_cast<char*>(_data.data() + offset);
+        return Status::OK();
+    }
+    std::string path() override { return _path; }
+
+    int64_t mtime() const override { return 0; }
+
+private:
+    std::string _path;
+    std::vector<uint8_t> _data;
+};
+
+TEST(ParquetPageCacheTest, CacheHitReturnsDecompressedPayload) {
+    ParquetPageReadContext ctx;
+    ctx.enable_parquet_file_page_cache = true;
+
+    // construct thrift PageHeader (uncompressed payload) and payload
+    tparquet::PageHeader header;
+    header.type = tparquet::PageType::DATA_PAGE;
+    header.__set_compressed_page_size(4);
+    header.__set_uncompressed_page_size(4);
+    header.__isset.data_page_header = true;
+    header.data_page_header.__set_num_values(1);
+
+    std::vector<uint8_t> header_bytes;
+    ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+    ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok());
+
+    std::vector<uint8_t> payload = {0x11, 0x22, 0x33, 0x44};
+    std::vector<uint8_t> cached_data;
+    cached_data.insert(cached_data.end(), header_bytes.begin(), 
header_bytes.end());
+    cached_data.insert(cached_data.end(), payload.begin(), payload.end());
+
+    std::string path = "test_parquet_cache_file";
+    int64_t header_offset = 128;
+    // make file_end_offset consistent with reader/page reader end offset used 
in test
+    int64_t file_end_offset = header_offset + 
static_cast<int64_t>(cached_data.size());
+
+    // insert into cache
+    int64_t mtime = 0;
+    StoragePageCache::CacheKey key(fmt::format("{}::{}", path, mtime),
+                                   static_cast<size_t>(file_end_offset), 
header_offset);
+    size_t total = cached_data.size();
+    auto* page = new DataPage(total, true, segment_v2::DATA_PAGE);
+    memcpy(page->data(), cached_data.data(), total);
+    page->reset_size(total);
+    PageCacheHandle handle;
+    StoragePageCache::instance()->insert(key, page, &handle, 
segment_v2::DATA_PAGE);
+
+    // create fake reader and a ColumnChunkReader to verify cache hit
+    // ensure the reader contains the same header+payload at the header offset 
so header parsing succeeds
+    std::vector<uint8_t> backing(256, 0);
+    memcpy(backing.data() + header_offset, cached_data.data(), total);
+    FakeBufferedReader reader(path, backing);
+    // prepare column chunk metadata so ColumnChunkReader uses same offsets
+    tparquet::ColumnChunk cc;
+    cc.meta_data.__set_data_page_offset(header_offset);
+    cc.meta_data.__set_total_compressed_size(total);
+    cc.meta_data.__set_num_values(1);
+    cc.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED);
+
+    FieldSchema field_schema;
+    field_schema.repetition_level = 0;
+    field_schema.definition_level = 0;
+
+    ColumnChunkReader<false, false> ccr(&reader, &cc, &field_schema, nullptr, 
0, nullptr, ctx);
+    ASSERT_TRUE(ccr.init().ok());
+    // load_page_data should hit the cache and return decompressed payload
+    ASSERT_TRUE(ccr.load_page_data().ok());
+    Slice s = ccr.get_page_data();
+    ASSERT_EQ(s.size, payload.size());
+    ASSERT_EQ(0, memcmp(s.data, payload.data(), payload.size()));
+    // stats: ensure there was a page read and at least one hit recorded
+    auto& statistics = ccr.statistics();
+    EXPECT_EQ(statistics.page_read_counter, 1);
+    EXPECT_EQ(statistics.page_cache_hit_counter, 1);
+    EXPECT_EQ(statistics.page_cache_decompressed_hit_counter, 1);
+}
+
+TEST(ParquetPageCacheTest, DecompressedPageInsertedByColumnChunkReader) {
+    ParquetPageReadContext ctx;
+    ctx.enable_parquet_file_page_cache = true;
+    // ensure decompressed pages are cached via BE config
+    double old_thresh = config::parquet_page_cache_decompress_threshold;
+    bool old_enable_compressed = config::enable_parquet_cache_compressed_pages;
+    config::parquet_page_cache_decompress_threshold = 100.0;
+    config::enable_parquet_cache_compressed_pages = false;
+
+    // construct uncompressed header + payload in file buffer
+    tparquet::PageHeader header;
+    header.type = tparquet::PageType::DATA_PAGE;
+    header.__set_compressed_page_size(4);
+    header.__set_uncompressed_page_size(4);
+    header.__isset.data_page_header = true;
+    header.data_page_header.__set_num_values(1);
+
+    std::vector<uint8_t> header_bytes;
+    ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+    ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok());
+
+    std::vector<uint8_t> payload = {0x55, 0x66, 0x77, 0x88};
+    std::vector<uint8_t> file_data;
+    file_data.insert(file_data.end(), header_bytes.begin(), 
header_bytes.end());
+    file_data.insert(file_data.end(), payload.begin(), payload.end());
+
+    std::string path = "test_parquet_insert_file";
+    int64_t header_offset = 0;
+
+    FakeBufferedReader reader(path, file_data);
+
+    // prepare column chunk metadata
+    tparquet::ColumnChunk cc;
+    cc.meta_data.__set_data_page_offset(header_offset);
+    cc.meta_data.__set_total_compressed_size(file_data.size());
+    cc.meta_data.__set_num_values(1);
+    cc.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED);
+
+    {
+        FieldSchema field_schema;
+        field_schema.repetition_level = 0;
+        field_schema.definition_level = 0;
+        ColumnChunkReader<false, false> ccr(&reader, &cc, &field_schema, 
nullptr, 0, nullptr, ctx);
+        ASSERT_TRUE(ccr.init().ok());
+        ASSERT_TRUE(ccr.load_page_data().ok());
+
+        // Now cache should have an entry; verify by creating a fresh 
ColumnChunkReader and hitting cache
+        ColumnChunkReader<false, false> ccr_check(&reader, &cc, &field_schema, 
nullptr, 0, nullptr,
+                                                  ctx);
+        ASSERT_TRUE(ccr_check.init().ok());
+        // ASSERT_TRUE(ccr_check.next_page().ok());
+        ASSERT_TRUE(ccr_check.load_page_data().ok());
+        Slice s = ccr_check.get_page_data();
+        ASSERT_EQ(s.size, payload.size());
+        EXPECT_EQ(0, memcmp(s.data, payload.data(), payload.size()));
+        EXPECT_EQ(ccr_check.statistics().page_cache_hit_counter, 1);
+    }
+    // restore config
+    config::parquet_page_cache_decompress_threshold = old_thresh;
+    config::enable_parquet_cache_compressed_pages = old_enable_compressed;
+}
+
+TEST(ParquetPageCacheTest, V2LevelsPreservedInCache) {
+    ParquetPageReadContext ctx;
+    ctx.enable_parquet_file_page_cache = true;
+    // ensure decompressed pages are cached via BE config
+    double old_thresh = config::parquet_page_cache_decompress_threshold;
+    bool old_enable_compressed = config::enable_parquet_cache_compressed_pages;
+    config::parquet_page_cache_decompress_threshold = 100.0;
+    config::enable_parquet_cache_compressed_pages = false;
+
+    // construct v2 header + levels + payload in file buffer (uncompressed)
+    tparquet::PageHeader header;
+    header.type = tparquet::PageType::DATA_PAGE_V2;
+    int rl = 2;
+    int dl = 1;
+    int payload_sz = 2;
+    header.__set_compressed_page_size(rl + dl + payload_sz);
+    header.__set_uncompressed_page_size(rl + dl + payload_sz);
+    header.__isset.data_page_header_v2 = true;
+    header.data_page_header_v2.__set_repetition_levels_byte_length(rl);
+    header.data_page_header_v2.__set_definition_levels_byte_length(dl);
+    header.data_page_header_v2.__set_is_compressed(false);
+    header.data_page_header_v2.__set_num_values(1);
+
+    std::vector<uint8_t> header_bytes;
+    ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+    ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok());
+
+    std::vector<uint8_t> level_bytes = {0x11, 0x22, 0x33};
+    std::vector<uint8_t> payload = {0xAA, 0xBB};
+    std::vector<uint8_t> file_data;
+    file_data.insert(file_data.end(), header_bytes.begin(), 
header_bytes.end());
+    file_data.insert(file_data.end(), level_bytes.begin(), level_bytes.end());
+    file_data.insert(file_data.end(), payload.begin(), payload.end());
+
+    std::string path = "test_v2_levels_file";
+    FakeBufferedReader reader(path, file_data);
+
+    // prepare column chunk metadata
+    tparquet::ColumnChunk cc;
+    cc.meta_data.__set_data_page_offset(0);
+    cc.meta_data.__set_total_compressed_size(file_data.size());
+    cc.meta_data.__set_num_values(1);
+    cc.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED);
+
+    FieldSchema field_schema;
+    field_schema.repetition_level = 0;
+    field_schema.definition_level = 0;
+    {
+        ColumnChunkReader<false, false> ccr(&reader, &cc, &field_schema, 
nullptr, 0, nullptr, ctx);
+        ASSERT_TRUE(ccr.init().ok());
+        ASSERT_TRUE(ccr.load_page_data().ok());
+
+        // Now cache should have entry; verify by creating a ColumnChunkReader 
and hitting cache
+        ColumnChunkReader<false, false> ccr_check(&reader, &cc, &field_schema, 
nullptr, 0, nullptr,
+                                                  ctx);
+        ASSERT_TRUE(ccr_check.init().ok());
+        ASSERT_TRUE(ccr_check.load_page_data().ok());
+        Slice s = ccr_check.get_page_data();
+        ASSERT_EQ(s.size, payload.size());
+        EXPECT_EQ(0, memcmp(s.data, payload.data(), payload.size()));
+    }
+
+    // Verify that a fresh ColumnChunkReader reusing cache gets level bytes 
preserved
+    FieldSchema field_schema2;
+    field_schema2.repetition_level = 2; // v2 levels present
+    field_schema2.definition_level = 1;
+    ColumnChunkReader<false, false> ccr2(&reader, &cc, &field_schema2, 
nullptr, 0, nullptr, ctx);
+    ASSERT_TRUE(ccr2.init().ok());
+    ASSERT_TRUE(ccr2.load_page_data().ok());
+    // Level slices should equal the original level bytes
+    const Slice& rep = ccr2.v2_rep_levels();
+    const Slice& def = ccr2.v2_def_levels();
+    auto& statistics = ccr2.statistics();
+    EXPECT_GT(statistics.page_cache_hit_counter, 0);
+    // because threshold is set to cache decompressed, we should see 
decompressed hits
+    EXPECT_GT(statistics.page_cache_decompressed_hit_counter, 0);
+    ASSERT_EQ(def.size, dl);
+    EXPECT_EQ(0, memcmp(rep.data, level_bytes.data(), rl));
+    EXPECT_EQ(0, memcmp(def.data, level_bytes.data() + rl, dl));
+    // restore config
+    config::parquet_page_cache_decompress_threshold = old_thresh;
+    config::enable_parquet_cache_compressed_pages = old_enable_compressed;
+}
+
+TEST(ParquetPageCacheTest, CompressedV1PageCachedAndHit) {
+    ParquetPageReadContext ctx;
+    ctx.enable_parquet_file_page_cache = true;
+
+    // construct compressed v1 header + compressed payload in file buffer
+    tparquet::PageHeader header;
+    header.type = tparquet::PageType::DATA_PAGE;
+    header.__isset.data_page_header = true;
+    header.data_page_header.__set_num_values(1);
+
+    std::vector<uint8_t> payload = {0x01, 0x02, 0x03, 0x04};
+
+    // compress payload using a block codec
+    BlockCompressionCodec* codec = nullptr;
+    
ASSERT_TRUE(get_block_compression_codec(segment_v2::CompressionTypePB::SNAPPY, 
&codec).ok());
+    faststring compressed_fast;
+    std::vector<Slice> inputs;
+    inputs.emplace_back(payload.data(), payload.size());
+    ASSERT_TRUE(codec->compress(inputs, payload.size(), 
&compressed_fast).ok());
+
+    
header.__set_compressed_page_size(static_cast<int32_t>(compressed_fast.size()));
+    header.__set_uncompressed_page_size(static_cast<int32_t>(payload.size()));
+
+    std::vector<uint8_t> header_bytes;
+    ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+    ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok());
+
+    std::vector<uint8_t> file_data;
+    file_data.insert(file_data.end(), header_bytes.begin(), 
header_bytes.end());
+    file_data.insert(file_data.end(), compressed_fast.data(),
+                     compressed_fast.data() + compressed_fast.size());
+
+    std::string path = "test_compressed_v1_file";
+    FakeBufferedReader reader(path, file_data);
+
+    tparquet::ColumnChunk cc;
+    cc.meta_data.__set_data_page_offset(0);
+    cc.meta_data.__set_total_compressed_size(file_data.size());
+    cc.meta_data.__set_num_values(1);
+    cc.meta_data.__set_codec(tparquet::CompressionCodec::SNAPPY);
+
+    FieldSchema field_schema;
+    field_schema.repetition_level = 0;
+    field_schema.definition_level = 0;
+
+    // Load page to trigger decompression + cache insert
+    ColumnChunkReader<false, false> ccr(&reader, &cc, &field_schema, nullptr, 
0, nullptr, ctx);
+    ASSERT_TRUE(ccr.init().ok());
+    ASSERT_TRUE(ccr.load_page_data().ok());
+    EXPECT_EQ(ccr.statistics().page_cache_write_counter, 1);
+
+    // Now verify a fresh reader hits the cache and returns payload
+    ColumnChunkReader<false, false> ccr_check(&reader, &cc, &field_schema, 
nullptr, 0, nullptr,
+                                              ctx);
+    ASSERT_TRUE(ccr_check.init().ok());
+    // ASSERT_TRUE(ccr_check.next_page().ok());
+    ASSERT_TRUE(ccr_check.load_page_data().ok());
+    Slice s = ccr_check.get_page_data();
+    ASSERT_EQ(s.size, payload.size());
+    EXPECT_EQ(0, memcmp(s.data, payload.data(), payload.size()));
+    EXPECT_EQ(ccr_check.statistics().page_cache_hit_counter, 1);
+}
+
+TEST(ParquetPageCacheTest, CompressedV2LevelsPreservedInCache) {
+    ParquetPageReadContext ctx;
+    ctx.enable_parquet_file_page_cache = true;
+
+    // construct v2 header + levels + compressed payload in file buffer
+    tparquet::PageHeader header;
+    header.type = tparquet::PageType::DATA_PAGE_V2;
+    int rl = 2;
+    int dl = 1;
+    //int payload_sz = 2;
+    header.__isset.data_page_header_v2 = true;
+    header.data_page_header_v2.__set_repetition_levels_byte_length(rl);
+    header.data_page_header_v2.__set_definition_levels_byte_length(dl);
+    header.data_page_header_v2.__set_is_compressed(true);
+    header.data_page_header_v2.__set_num_values(1);
+
+    std::vector<uint8_t> level_bytes = {0x11, 0x22, 0x33};
+    std::vector<uint8_t> payload = {0xAA, 0xBB};
+
+    // compress payload
+    BlockCompressionCodec* codec = nullptr;
+    
ASSERT_TRUE(get_block_compression_codec(segment_v2::CompressionTypePB::SNAPPY, 
&codec).ok());
+    faststring compressed_fast;
+    std::vector<Slice> inputs;
+    inputs.emplace_back(payload.data(), payload.size());
+    ASSERT_TRUE(codec->compress(inputs, payload.size(), 
&compressed_fast).ok());
+
+    // compressed page: levels (uncompressed) followed by compressed payload
+    std::vector<uint8_t> compressed_page;
+    compressed_page.insert(compressed_page.end(), level_bytes.begin(), 
level_bytes.end());
+    compressed_page.insert(compressed_page.end(), compressed_fast.data(),
+                           compressed_fast.data() + compressed_fast.size());
+
+    
header.__set_compressed_page_size(static_cast<int32_t>(compressed_page.size()));
+    
header.__set_uncompressed_page_size(static_cast<int32_t>(level_bytes.size() + 
payload.size()));
+
+    std::vector<uint8_t> header_bytes;
+    ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+    ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok());
+
+    std::vector<uint8_t> file_data;
+    file_data.insert(file_data.end(), header_bytes.begin(), 
header_bytes.end());
+    file_data.insert(file_data.end(), compressed_page.begin(), 
compressed_page.end());
+
+    std::string path = "test_compressed_v2_file";
+    FakeBufferedReader reader(path, file_data);
+
+    tparquet::ColumnChunk cc;
+    cc.meta_data.__set_data_page_offset(0);
+    cc.meta_data.__set_total_compressed_size(file_data.size());
+    cc.meta_data.__set_num_values(1);
+    cc.meta_data.__set_codec(tparquet::CompressionCodec::SNAPPY);
+
+    FieldSchema field_schema;
+    field_schema.repetition_level = 0;
+    field_schema.definition_level = 0;
+
+    // Load page to trigger decompression + cache insert
+    ColumnChunkReader<false, false> ccr(&reader, &cc, &field_schema, nullptr, 
0, nullptr, ctx);
+    ASSERT_TRUE(ccr.init().ok());
+    ASSERT_TRUE(ccr.load_page_data().ok());
+    EXPECT_EQ(ccr.statistics().page_cache_write_counter, 1);
+
+    // Now verify a fresh reader hits the cache and v2 levels are preserved
+    FieldSchema field_schema2;
+    field_schema2.repetition_level = rl;
+    field_schema2.definition_level = dl;
+    ColumnChunkReader<false, false> ccr_check(&reader, &cc, &field_schema2, 
nullptr, 0, nullptr,
+                                              ctx);
+    ASSERT_TRUE(ccr_check.init().ok());
+    ASSERT_TRUE(ccr_check.load_page_data().ok());
+    Slice s = ccr_check.get_page_data();
+    ASSERT_EQ(s.size, payload.size());
+    EXPECT_EQ(0, memcmp(s.data, payload.data(), payload.size()));
+    const Slice& rep = ccr_check.v2_rep_levels();
+    const Slice& def = ccr_check.v2_def_levels();
+    ASSERT_EQ(rep.size, rl);
+    ASSERT_EQ(def.size, dl);
+    // cached v2 page is stored decompressed (threshold=100), make sure 
counter reflects it
+    EXPECT_GT(ccr_check.statistics().page_cache_decompressed_hit_counter, 0);
+    EXPECT_EQ(0, memcmp(rep.data, level_bytes.data(), rl));
+    EXPECT_EQ(0, memcmp(def.data, level_bytes.data() + rl, dl));
+}
+
+TEST(ParquetPageCacheTest, MultiPagesMixedV1V2CacheHit) {
+    ParquetPageReadContext ctx;
+    ctx.enable_parquet_file_page_cache = true;
+
+    // Prepare a v1 uncompressed page and a v2 uncompressed page and insert 
both into cache
+    std::string path = "test_multi_pages_file";
+
+    // v1 page
+    tparquet::PageHeader hdr1;
+    hdr1.type = tparquet::PageType::DATA_PAGE;
+    hdr1.__set_compressed_page_size(4);
+    hdr1.__set_uncompressed_page_size(4);
+    hdr1.__isset.data_page_header = true;
+    hdr1.data_page_header.__set_num_values(1);
+    std::vector<uint8_t> header1_bytes;
+    ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+    ASSERT_TRUE(ts.serialize(&hdr1, &header1_bytes).ok());
+    std::vector<uint8_t> payload1 = {0x10, 0x20, 0x30, 0x40};
+    std::vector<uint8_t> cached1;
+    cached1.insert(cached1.end(), header1_bytes.begin(), header1_bytes.end());
+    cached1.insert(cached1.end(), payload1.begin(), payload1.end());
+
+    // v2 page
+    tparquet::PageHeader hdr2;
+    hdr2.type = tparquet::PageType::DATA_PAGE_V2;
+    int rl = 2;
+    int dl = 1;
+    int payload2_sz = 2;
+    hdr2.__set_compressed_page_size(rl + dl + payload2_sz);
+    hdr2.__set_uncompressed_page_size(rl + dl + payload2_sz);
+    hdr2.__isset.data_page_header_v2 = true;
+    hdr2.data_page_header_v2.__set_repetition_levels_byte_length(rl);
+    hdr2.data_page_header_v2.__set_definition_levels_byte_length(dl);
+    hdr2.data_page_header_v2.__set_is_compressed(false);
+    hdr2.data_page_header_v2.__set_num_values(1);
+    std::vector<uint8_t> header2_bytes;
+    ASSERT_TRUE(ts.serialize(&hdr2, &header2_bytes).ok());
+    std::vector<uint8_t> level_bytes = {0x11, 0x22, 0x33};
+    std::vector<uint8_t> payload2 = {0xAA, 0xBB};
+    std::vector<uint8_t> cached2;
+    cached2.insert(cached2.end(), header2_bytes.begin(), header2_bytes.end());
+    cached2.insert(cached2.end(), level_bytes.begin(), level_bytes.end());
+    cached2.insert(cached2.end(), payload2.begin(), payload2.end());
+
+    // Insert both pages into cache under different header offsets
+    size_t total1 = cached1.size();
+    auto* page1 = new DataPage(total1, true, segment_v2::DATA_PAGE);
+    memcpy(page1->data(), cached1.data(), total1);
+    page1->reset_size(total1);
+    PageCacheHandle h1;
+    size_t header1_start = 128;
+    int64_t mtime = 0;
+    StoragePageCache::CacheKey key1(fmt::format("{}::{}", path, mtime),
+                                    static_cast<size_t>(header1_start + 
total1), header1_start);
+    StoragePageCache::instance()->insert(key1, page1, &h1, 
segment_v2::DATA_PAGE);
+
+    size_t total2 = cached2.size();
+    auto* page2 = new DataPage(total2, true, segment_v2::DATA_PAGE);
+    memcpy(page2->data(), cached2.data(), total2);
+    page2->reset_size(total2);
+    PageCacheHandle h2;
+    size_t header2_start = 256;
+    StoragePageCache::CacheKey key2(fmt::format("{}::{}", path, mtime),
+                                    static_cast<size_t>(header2_start + 
total2), header2_start);
+    StoragePageCache::instance()->insert(key2, page2, &h2, 
segment_v2::DATA_PAGE);
+
+    // Now create readers that would lookup those cache keys
+    // Reader1 must expose header+page bytes at offset header1_start
+    std::vector<uint8_t> reader_backing1(3000, 0);
+    memcpy(reader_backing1.data() + header1_start, cached1.data(), total1);
+    FakeBufferedReader reader1(path, reader_backing1);
+    tparquet::ColumnChunk cc1;
+    cc1.meta_data.__set_data_page_offset(128);
+    cc1.meta_data.__set_total_compressed_size(total1);
+    cc1.meta_data.__set_num_values(1);
+    cc1.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED);
+    FieldSchema field_schema1;
+    field_schema1.repetition_level = 0;
+    field_schema1.definition_level = 0;
+    ColumnChunkReader<false, false> ccr1(&reader1, &cc1, &field_schema1, 
nullptr, 0, nullptr, ctx);
+    ASSERT_TRUE(ccr1.init().ok());
+    ASSERT_TRUE(ccr1.load_page_data().ok());
+    Slice s1 = ccr1.get_page_data();
+    ASSERT_EQ(s1.size, payload1.size());
+    EXPECT_EQ(0, memcmp(s1.data, payload1.data(), payload1.size()));
+
+    std::vector<uint8_t> reader_backing2(3000, 0);
+    memcpy(reader_backing2.data() + header2_start, cached2.data(), total2);
+    FakeBufferedReader reader2(path, reader_backing2);
+    tparquet::ColumnChunk cc2;
+    cc2.meta_data.__set_data_page_offset(256);
+    cc2.meta_data.__set_total_compressed_size(total2);
+    cc2.meta_data.__set_num_values(1);
+    cc2.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED);
+    FieldSchema field_schema2;
+    field_schema2.repetition_level = rl;
+    field_schema2.definition_level = dl;
+    ColumnChunkReader<false, false> ccr2(&reader2, &cc2, &field_schema2, 
nullptr, 0, nullptr, ctx);
+    ASSERT_TRUE(ccr2.init().ok());
+    ASSERT_TRUE(ccr2.load_page_data().ok());
+    Slice s2 = ccr2.get_page_data();
+    ASSERT_EQ(s2.size, payload2.size());
+    EXPECT_EQ(0, memcmp(s2.data, payload2.data(), payload2.size()));
+    const Slice& rep = ccr2.v2_rep_levels();
+    const Slice& def = ccr2.v2_def_levels();
+    ASSERT_EQ(rep.size, rl);
+    ASSERT_EQ(def.size, dl);
+    EXPECT_EQ(0, memcmp(rep.data, level_bytes.data(), rl));
+    EXPECT_EQ(0, memcmp(def.data, level_bytes.data() + rl, dl));
+}
+
+TEST(ParquetPageCacheTest, CacheMissThenHit) {
+    ParquetPageReadContext ctx;
+    ctx.enable_parquet_file_page_cache = true;
+
+    // uncompressed v1 page
+    tparquet::PageHeader header;
+    header.type = tparquet::PageType::DATA_PAGE;
+    header.__set_compressed_page_size(4);
+    header.__set_uncompressed_page_size(4);
+    header.__isset.data_page_header = true;
+    header.data_page_header.__set_num_values(1);
+    std::vector<uint8_t> header_bytes;
+    ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+    ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok());
+    std::vector<uint8_t> payload = {0xDE, 0xAD, 0xBE, 0xEF};
+    std::vector<uint8_t> backing(256, 0);
+    std::vector<uint8_t> cached;
+    cached.insert(cached.end(), header_bytes.begin(), header_bytes.end());
+    cached.insert(cached.end(), payload.begin(), payload.end());
+    int64_t header_offset = 64;
+    memcpy(backing.data() + header_offset, cached.data(), cached.size());
+
+    std::string path = "test_miss_then_hit";
+    FakeBufferedReader reader(path, backing);
+
+    tparquet::ColumnChunk cc;
+    cc.meta_data.__set_data_page_offset(header_offset);
+    cc.meta_data.__set_total_compressed_size(cached.size());
+    cc.meta_data.__set_num_values(1);
+    cc.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED);
+
+    FieldSchema fs;
+    fs.repetition_level = 0;
+    fs.definition_level = 0;
+
+    // First reader: should not hit cache, but should write cache
+    ColumnChunkReader<false, false> ccr(&reader, &cc, &fs, nullptr, 0, 
nullptr, ctx);
+    ASSERT_TRUE(ccr.init().ok());
+    ASSERT_TRUE(ccr.load_page_data().ok());
+    auto& statistics = ccr.statistics();
+    EXPECT_EQ(statistics.page_cache_hit_counter, 0);
+    EXPECT_EQ(statistics.page_cache_write_counter, 1);
+
+    // Second reader: should hit cache
+    ColumnChunkReader<false, false> ccr2(&reader, &cc, &fs, nullptr, 0, 
nullptr, ctx);
+    ASSERT_TRUE(ccr2.init().ok());
+    ASSERT_TRUE(ccr2.load_page_data().ok());
+    auto& statistics2 = ccr2.statistics();
+    EXPECT_EQ(statistics2.page_cache_hit_counter, 1);
+    EXPECT_EQ(statistics2.page_cache_decompressed_hit_counter, 1);
+}
+
+TEST(ParquetPageCacheTest, DecompressThresholdCachesCompressed) {
+    ParquetPageReadContext ctx;
+    ctx.enable_parquet_file_page_cache = true;
+
+    // prepare a compressible payload (lots of zeros)
+    std::vector<uint8_t> payload(1024, 0);
+
+    // compress payload using snappy
+    BlockCompressionCodec* codec = nullptr;
+    
ASSERT_TRUE(get_block_compression_codec(segment_v2::CompressionTypePB::SNAPPY, 
&codec).ok());
+    faststring compressed_fast;
+    std::vector<Slice> inputs;
+    inputs.emplace_back(payload.data(), payload.size());
+    ASSERT_TRUE(codec->compress(inputs, payload.size(), 
&compressed_fast).ok());
+
+    tparquet::PageHeader header;
+    header.type = tparquet::PageType::DATA_PAGE;
+    
header.__set_compressed_page_size(static_cast<int32_t>(compressed_fast.size()));
+    header.__set_uncompressed_page_size(static_cast<int32_t>(payload.size()));
+    header.__isset.data_page_header = true;
+    header.data_page_header.__set_num_values(1);
+
+    std::vector<uint8_t> header_bytes;
+    ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+    ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok());
+
+    std::vector<uint8_t> file_data;
+    file_data.insert(file_data.end(), header_bytes.begin(), 
header_bytes.end());
+    file_data.insert(file_data.end(), compressed_fast.data(),
+                     compressed_fast.data() + compressed_fast.size());
+
+    std::string path = "test_threshold_file_compressed";
+    FakeBufferedReader reader(path, file_data);
+
+    tparquet::ColumnChunk cc;
+    cc.meta_data.__set_data_page_offset(0);
+    cc.meta_data.__set_total_compressed_size(file_data.size());
+    cc.meta_data.__set_num_values(1);
+    cc.meta_data.__set_codec(tparquet::CompressionCodec::SNAPPY);
+
+    FieldSchema fs;
+    fs.repetition_level = 0;
+    fs.definition_level = 0;
+
+    // Case: very small threshold -> cache the compressed payload (smaller 
footprint)
+    double old_thresh = config::parquet_page_cache_decompress_threshold;
+    bool old_enable_compressed = config::enable_parquet_cache_compressed_pages;
+    config::parquet_page_cache_decompress_threshold = 0.1;
+    config::enable_parquet_cache_compressed_pages = true;
+    ColumnChunkReader<false, false> ccr_small_thresh(&reader, &cc, &fs, 
nullptr, 0, nullptr, ctx);
+    ASSERT_TRUE(ccr_small_thresh.init().ok());
+    // ASSERT_TRUE(ccr_small_thresh.next_page().ok());
+    ASSERT_TRUE(ccr_small_thresh.load_page_data().ok());
+    EXPECT_EQ(ccr_small_thresh.statistics().page_cache_write_counter, 1);
+
+    // Inspect cache entry: payload stored should be compressed size
+    PageCacheHandle handle_small;
+    size_t file_end = header_bytes.size() + compressed_fast.size();
+    int64_t mtime = 0;
+    StoragePageCache::CacheKey key_small(fmt::format("{}::{}", path, mtime),
+                                         /*file_end_offset*/ file_end, 
/*header_start*/ 0);
+    bool found_small =
+            StoragePageCache::instance()->lookup(key_small, &handle_small, 
segment_v2::DATA_PAGE);
+    ASSERT_TRUE(found_small);
+    Slice cached_small = handle_small.data();
+    size_t header_size = header_bytes.size();
+    size_t payload_in_cache_size = cached_small.size - header_size; // no 
levels here
+    ASSERT_EQ(payload_in_cache_size, compressed_fast.size());
+
+    // restore config
+    config::parquet_page_cache_decompress_threshold = old_thresh;
+    config::enable_parquet_cache_compressed_pages = old_enable_compressed;
+}
+
+TEST(ParquetPageCacheTest, DecompressThresholdCachesDecompressed) {
+    ParquetPageReadContext ctx;
+    ctx.enable_parquet_file_page_cache = true;
+
+    // prepare a compressible payload (lots of zeros)
+    std::vector<uint8_t> payload(1024, 0);
+
+    // compress payload using snappy
+    BlockCompressionCodec* codec = nullptr;
+    
ASSERT_TRUE(get_block_compression_codec(segment_v2::CompressionTypePB::SNAPPY, 
&codec).ok());
+    faststring compressed_fast;
+    std::vector<Slice> inputs;
+    inputs.emplace_back(payload.data(), payload.size());
+    ASSERT_TRUE(codec->compress(inputs, payload.size(), 
&compressed_fast).ok());
+
+    tparquet::PageHeader header;
+    header.type = tparquet::PageType::DATA_PAGE;
+    
header.__set_compressed_page_size(static_cast<int32_t>(compressed_fast.size()));
+    header.__set_uncompressed_page_size(static_cast<int32_t>(payload.size()));
+    header.__isset.data_page_header = true;
+    header.data_page_header.__set_num_values(1);
+
+    std::vector<uint8_t> header_bytes;
+    ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+    ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok());
+
+    std::vector<uint8_t> file_data;
+    file_data.insert(file_data.end(), header_bytes.begin(), 
header_bytes.end());
+    file_data.insert(file_data.end(), compressed_fast.data(),
+                     compressed_fast.data() + compressed_fast.size());
+
+    std::string path = "test_threshold_file_decompressed";
+    FakeBufferedReader reader(path, file_data);
+
+    tparquet::ColumnChunk cc;
+    cc.meta_data.__set_data_page_offset(0);
+    cc.meta_data.__set_total_compressed_size(file_data.size());
+    cc.meta_data.__set_num_values(1);
+    cc.meta_data.__set_codec(tparquet::CompressionCodec::SNAPPY);
+
+    FieldSchema fs;
+    fs.repetition_level = 0;
+    fs.definition_level = 0;
+
+    // Case: very large threshold -> cache decompressed payload
+    double old_thresh = config::parquet_page_cache_decompress_threshold;
+    bool old_enable_compressed = config::enable_parquet_cache_compressed_pages;
+    config::parquet_page_cache_decompress_threshold = 100.0;
+    config::enable_parquet_cache_compressed_pages = false;
+    ColumnChunkReader<false, false> ccr_large_thresh(&reader, &cc, &fs, 
nullptr, 0, nullptr, ctx);
+    ASSERT_TRUE(ccr_large_thresh.init().ok());
+    // ASSERT_TRUE(ccr_large_thresh.next_page().ok());
+    ASSERT_TRUE(ccr_large_thresh.load_page_data().ok());
+    EXPECT_EQ(ccr_large_thresh.statistics().page_cache_write_counter, 1);
+
+    // Inspect cache entry for large threshold: payload stored should be 
uncompressed size
+    PageCacheHandle handle_large;
+    size_t file_end = header_bytes.size() + compressed_fast.size();
+    int64_t mtime = 0;
+    StoragePageCache::CacheKey key_large(fmt::format("{}::{}", path, mtime),
+                                         /*file_end_offset*/ file_end, 
/*header_start*/ 0);
+    bool found_large =
+            StoragePageCache::instance()->lookup(key_large, &handle_large, 
segment_v2::DATA_PAGE);
+    ASSERT_TRUE(found_large);
+    Slice cached_large = handle_large.data();
+    size_t payload_in_cache_size_large = cached_large.size - 
header_bytes.size();
+    ASSERT_EQ(payload_in_cache_size_large, payload.size());
+
+    // Verify cache hit for a new reader (should hit the decompressed entry we 
just created)
+    ColumnChunkReader<false, false> ccr_check(&reader, &cc, &fs, nullptr, 0, 
nullptr, ctx);
+    ASSERT_TRUE(ccr_check.init().ok());
+    // ASSERT_TRUE(ccr_check.next_page().ok());
+    ASSERT_TRUE(ccr_check.load_page_data().ok());
+    EXPECT_EQ(ccr_check.statistics().page_cache_hit_counter, 1);
+    // restore config
+    config::parquet_page_cache_decompress_threshold = old_thresh;
+    config::enable_parquet_cache_compressed_pages = old_enable_compressed;
+}
+
+TEST(ParquetPageCacheTest, MultipleReadersShareCachedEntry) {
+    ParquetPageReadContext ctx;
+    ctx.enable_parquet_file_page_cache = true;
+    double old_thresh = config::parquet_page_cache_decompress_threshold;
+    bool old_enable_compressed = config::enable_parquet_cache_compressed_pages;
+    config::parquet_page_cache_decompress_threshold = 100.0;
+    config::enable_parquet_cache_compressed_pages = false;
+
+    // Create a v2 cached page and then instantiate multiple readers that hit 
the cache
+    std::string path = "test_shared_handles";
+    tparquet::PageHeader hdr;
+    hdr.type = tparquet::PageType::DATA_PAGE_V2;
+    int rl = 2;
+    int dl = 1;
+    hdr.__isset.data_page_header_v2 = true;
+    hdr.data_page_header_v2.__set_repetition_levels_byte_length(rl);
+    hdr.data_page_header_v2.__set_definition_levels_byte_length(dl);
+    hdr.data_page_header_v2.__set_is_compressed(false);
+    hdr.data_page_header_v2.__set_num_values(1);
+    std::vector<uint8_t> header_bytes;
+    ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+    ASSERT_TRUE(ts.serialize(&hdr, &header_bytes).ok());
+    std::vector<uint8_t> level_bytes = {0x11, 0x22, 0x33};
+    std::vector<uint8_t> payload = {0x0A, 0x0B};
+    std::vector<uint8_t> cached;
+    cached.insert(cached.end(), header_bytes.begin(), header_bytes.end());
+    cached.insert(cached.end(), level_bytes.begin(), level_bytes.end());
+    cached.insert(cached.end(), payload.begin(), payload.end());
+
+    size_t total = cached.size();
+    auto* page = new DataPage(total, true, segment_v2::DATA_PAGE);
+    memcpy(page->data(), cached.data(), total);
+    page->reset_size(total);
+    PageCacheHandle handle;
+    size_t header_start = 512;
+    int64_t mtime = 0;
+    StoragePageCache::CacheKey key(fmt::format("{}::{}", path, mtime),
+                                   static_cast<size_t>(header_start + total), 
header_start);
+    StoragePageCache::instance()->insert(key, page, &handle, 
segment_v2::DATA_PAGE);
+
+    // Create multiple readers that will hit cache
+    const int N = 4;
+    for (int i = 0; i < N; ++i) {
+        std::vector<uint8_t> reader_backing(5000, 0);
+        memcpy(reader_backing.data() + header_start, cached.data(), total);
+        FakeBufferedReader reader(path, reader_backing);
+        tparquet::ColumnChunk cc;
+        cc.meta_data.__set_data_page_offset(512);
+        cc.meta_data.__set_total_compressed_size(total);
+        cc.meta_data.__set_num_values(1);
+        cc.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED);
+        FieldSchema fs;
+        fs.repetition_level = rl;
+        fs.definition_level = dl;
+        ColumnChunkReader<false, false> ccr(&reader, &cc, &fs, nullptr, 0, 
nullptr, ctx);
+        ASSERT_TRUE(ccr.init().ok());
+        ASSERT_TRUE(ccr.load_page_data().ok());
+        Slice s = ccr.get_page_data();
+        ASSERT_EQ(s.size, payload.size());
+        EXPECT_EQ(0, memcmp(s.data, payload.data(), payload.size()));
+        const Slice& rep = ccr.v2_rep_levels();
+        const Slice& def = ccr.v2_def_levels();
+        ASSERT_EQ(rep.size, rl);
+        ASSERT_EQ(def.size, dl);
+    }
+    // restore config
+    config::parquet_page_cache_decompress_threshold = old_thresh;
+    config::enable_parquet_cache_compressed_pages = old_enable_compressed;
+}
diff --git a/be/test/vec/exec/format/parquet/parquet_thrift_test.cpp 
b/be/test/vec/exec/format/parquet/parquet_thrift_test.cpp
index 1d9c62ebfad..e3695b2b545 100644
--- a/be/test/vec/exec/format/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/format/parquet/parquet_thrift_test.cpp
@@ -197,8 +197,9 @@ static Status get_column_values(io::FileReaderSPtr 
file_reader, tparquet::Column
 
     io::BufferedFileStreamReader stream_reader(file_reader, start_offset, 
chunk_size, 1024);
 
+    ParquetPageReadContext page_read_ctx;
     ColumnChunkReader<false, false> chunk_reader(&stream_reader, column_chunk, 
field_schema,
-                                                 nullptr, total_rows, nullptr);
+                                                 nullptr, total_rows, nullptr, 
page_read_ctx);
     // initialize chunk reader
     static_cast<void>(chunk_reader.init());
     // seek to next page header
diff --git a/be/test/vec/exec/orc/orc_file_reader_test.cpp 
b/be/test/vec/exec/orc/orc_file_reader_test.cpp
index 9e1003c397f..4c71129cdbb 100644
--- a/be/test/vec/exec/orc/orc_file_reader_test.cpp
+++ b/be/test/vec/exec/orc/orc_file_reader_test.cpp
@@ -41,6 +41,8 @@ public:
 
     bool closed() const override { return _closed; }
 
+    int64_t mtime() const override { return 0; }
+
     void set_data(const std::string& data) { _data = data; }
 
 protected:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 576c3019144..2120dc5e73a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -506,6 +506,7 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String SHOW_USER_DEFAULT_ROLE = 
"show_user_default_role";
 
     public static final String ENABLE_PAGE_CACHE = "enable_page_cache";
+    public static final String ENABLE_PARQUET_FILE_PAGE_CACHE = 
"enable_parquet_file_page_cache";
 
     public static final String MINIDUMP_PATH = "minidump_path";
 
@@ -2232,6 +2233,13 @@ public class SessionVariable implements Serializable, 
Writable {
             needForward = true)
     public boolean enablePageCache = true;
 
+    @VariableMgr.VarAttr(
+            name = ENABLE_PARQUET_FILE_PAGE_CACHE,
+            description = {"控制是否启用 Parquet file page cache。默认为 true。",
+                    "Controls whether to use Parquet file page cache. The 
default is true."},
+            needForward = true)
+    public boolean enableParquetFilePageCache = true;
+
     @VariableMgr.VarAttr(name = ENABLE_FOLD_NONDETERMINISTIC_FN)
     public boolean enableFoldNondeterministicFn = false;
 
@@ -5115,6 +5123,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
         tResult.setEnablePageCache(enablePageCache);
 
+        tResult.setEnableParquetFilePageCache(enableParquetFilePageCache);
+
         tResult.setFileCacheBasePath(fileCacheBasePath);
 
         tResult.setEnableInvertedIndexQuery(enableInvertedIndexQuery);
@@ -5129,6 +5139,8 @@ public class SessionVariable implements Serializable, 
Writable {
         tResult.setEnableOrcLazyMat(enableOrcLazyMat);
         tResult.setEnableParquetFilterByMinMax(enableParquetFilterByMinMax);
         
tResult.setEnableParquetFilterByBloomFilter(enableParquetFilterByBloomFilter);
+
+        tResult.setEnableParquetFilePageCache(enableParquetFilePageCache);
         tResult.setEnableOrcFilterByMinMax(enableOrcFilterByMinMax);
         tResult.setCheckOrcInitSargsSuccess(checkOrcInitSargsSuccess);
 
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 9fbc2058232..3f1c5feedb9 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -436,6 +436,8 @@ struct TQueryOptions {
   // hash table expansion thresholds since all data is local.
   202: optional bool single_backend_query = false;
 
+  185: optional bool enable_parquet_file_page_cache = true;
+
   // For cloud, to control if the content would be written into file cache
   // In write path, to control if the content would be written into file cache.
   // In read path, read from file cache or remote storage when execute query.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to