This is an automated email from the ASF dual-hosted git repository.

kakachen pushed a commit to branch tpc_preview4-external
in repository https://gitbox.apache.org/repos/asf/doris.git

commit db54cd036af850bb9173d9fb99fb9e7433ca2fb2
Author: kakachen <[email protected]>
AuthorDate: Tue Dec 23 21:40:07 2025 +0800

    [feature](vparquet-reader) Implements parquet file page cache.
---
 be/src/common/config.cpp                           |   5 +
 be/src/common/config.h                             |   4 +
 be/src/io/cache/cached_remote_file_reader.h        |   2 +
 be/src/io/file_factory.cpp                         |  15 +
 be/src/io/file_factory.h                           |   6 +
 be/src/io/fs/broker_file_reader.cpp                |   6 +-
 be/src/io/fs/broker_file_reader.h                  |   5 +-
 be/src/io/fs/broker_file_system.cpp                |   2 +-
 be/src/io/fs/buffered_reader.h                     |  12 +
 be/src/io/fs/file_reader.h                         |   3 +
 be/src/io/fs/hdfs_file_reader.cpp                  |   7 +-
 be/src/io/fs/hdfs_file_reader.h                    |   5 +-
 be/src/io/fs/http_file_reader.cpp                  |   7 +-
 be/src/io/fs/http_file_reader.h                    |   5 +-
 be/src/io/fs/http_file_system.cpp                  |   2 +-
 be/src/io/fs/local_file_reader.h                   |   2 +
 be/src/io/fs/s3_file_reader.h                      |   2 +
 be/src/io/fs/stream_load_pipe.h                    |   2 +
 be/src/io/fs/tracing_file_reader.h                 |   2 +
 be/src/vec/exec/format/orc/orc_file_reader.h       |   2 +
 .../parquet/vparquet_column_chunk_reader.cpp       | 313 ++++++--
 .../format/parquet/vparquet_column_chunk_reader.h  |  51 +-
 .../exec/format/parquet/vparquet_column_reader.cpp |  28 +-
 .../exec/format/parquet/vparquet_column_reader.h   |  45 +-
 .../exec/format/parquet/vparquet_group_reader.cpp  |  13 +-
 .../exec/format/parquet/vparquet_group_reader.h    |   3 +-
 .../exec/format/parquet/vparquet_page_reader.cpp   | 141 +++-
 .../vec/exec/format/parquet/vparquet_page_reader.h |  88 ++-
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |  44 +-
 be/src/vec/exec/format/parquet/vparquet_reader.h   |   8 +
 be/test/io/fs/buffered_reader_test.cpp             |   6 +
 .../format/file_reader/file_meta_cache_test.cpp    |   2 +
 .../format/parquet/parquet_page_cache_test.cpp     | 859 +++++++++++++++++++++
 be/test/vec/exec/orc/orc_file_reader_test.cpp      |   2 +
 .../java/org/apache/doris/qe/SessionVariable.java  |  27 +
 gensrc/thrift/PaloInternalService.thrift           |   4 +
 36 files changed, 1636 insertions(+), 94 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 03a25a6f891..e9f5792028c 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -428,6 +428,11 @@ DEFINE_Int32(index_page_cache_percentage, "10");
 DEFINE_mBool(disable_storage_page_cache, "false");
 // whether to disable row cache feature in storage
 DEFINE_mBool(disable_storage_row_cache, "true");
+// Parquet page cache: threshold ratio for caching decompressed vs compressed 
pages
+// If uncompressed_size / compressed_size <= threshold, cache decompressed; 
otherwise cache compressed
+DEFINE_Double(parquet_page_cache_decompress_threshold, "1.5");
+// Parquet page cache: whether to enable caching compressed pages (when ratio 
exceeds threshold)
+DEFINE_Bool(enable_parquet_cache_compressed_pages, "false");
 // whether to disable pk page cache feature in storage
 DEFINE_Bool(disable_pk_storage_page_cache, "false");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 7b19e8e4e4d..09e397f6323 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -444,6 +444,10 @@ DECLARE_Int32(index_page_cache_percentage);
 DECLARE_Bool(disable_storage_page_cache);
 // whether to disable row cache feature in storage
 DECLARE_mBool(disable_storage_row_cache);
+// Parquet page cache: threshold ratio for caching decompressed vs compressed 
pages
+DECLARE_Double(parquet_page_cache_decompress_threshold);
+// Parquet page cache: whether to enable caching compressed pages
+DECLARE_Bool(enable_parquet_cache_compressed_pages);
 // whether to disable pk page cache feature in storage
 DECLARE_Bool(disable_pk_storage_page_cache);
 
diff --git a/be/src/io/cache/cached_remote_file_reader.h 
b/be/src/io/cache/cached_remote_file_reader.h
index 939471b62ea..20c1a47ce88 100644
--- a/be/src/io/cache/cached_remote_file_reader.h
+++ b/be/src/io/cache/cached_remote_file_reader.h
@@ -55,6 +55,8 @@ public:
 
     static std::pair<size_t, size_t> s_align_size(size_t offset, size_t size, 
size_t length);
 
+    int64_t mtime() const override { return _remote_file_reader->mtime(); }
+
 protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index bd08bc20461..07484935643 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -203,6 +203,21 @@ Result<io::FileReaderSPtr> FileFactory::create_file_reader(
         const io::FileSystemProperties& system_properties,
         const io::FileDescription& file_description, const 
io::FileReaderOptions& reader_options,
         RuntimeProfile* profile) {
+    auto reader_res = _create_file_reader_internal(system_properties, 
file_description,
+                                                   reader_options, profile);
+    if (!reader_res.has_value()) {
+        return unexpected(std::move(reader_res).error());
+    }
+    auto file_reader = std::move(reader_res).value();
+    LOG_INFO("create file reader for path={}, size={}, mtime={}", 
file_description.path,
+             file_description.file_size, file_description.mtime);
+    return file_reader;
+}
+
+Result<io::FileReaderSPtr> FileFactory::_create_file_reader_internal(
+        const io::FileSystemProperties& system_properties,
+        const io::FileDescription& file_description, const 
io::FileReaderOptions& reader_options,
+        RuntimeProfile* profile) {
     TFileType::type type = system_properties.system_type;
     switch (type) {
     case TFileType::FILE_LOCAL: {
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index 0ba791bd0a3..61e322ca0af 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -126,6 +126,12 @@ public:
 
 private:
     static std::string _get_fs_name(const io::FileDescription& 
file_description);
+
+    /// Create FileReader without FS
+    static Result<io::FileReaderSPtr> _create_file_reader_internal(
+            const io::FileSystemProperties& system_properties,
+            const io::FileDescription& file_description,
+            const io::FileReaderOptions& reader_options, RuntimeProfile* 
profile = nullptr);
 };
 
 } // namespace doris
diff --git a/be/src/io/fs/broker_file_reader.cpp 
b/be/src/io/fs/broker_file_reader.cpp
index 102ea3e2477..41b2992f700 100644
--- a/be/src/io/fs/broker_file_reader.cpp
+++ b/be/src/io/fs/broker_file_reader.cpp
@@ -39,12 +39,14 @@ struct IOContext;
 
 BrokerFileReader::BrokerFileReader(const TNetworkAddress& broker_addr, Path 
path, size_t file_size,
                                    TBrokerFD fd,
-                                   std::shared_ptr<BrokerServiceConnection> 
connection)
+                                   std::shared_ptr<BrokerServiceConnection> 
connection,
+                                   int64_t mtime)
         : _path(std::move(path)),
           _file_size(file_size),
           _broker_addr(broker_addr),
           _fd(fd),
-          _connection(std::move(connection)) {
+          _connection(std::move(connection)),
+          _mtime(mtime) {
     DorisMetrics::instance()->broker_file_open_reading->increment(1);
     DorisMetrics::instance()->broker_file_reader_total->increment(1);
 }
diff --git a/be/src/io/fs/broker_file_reader.h 
b/be/src/io/fs/broker_file_reader.h
index 7d19edb32c0..2f6bd94b652 100644
--- a/be/src/io/fs/broker_file_reader.h
+++ b/be/src/io/fs/broker_file_reader.h
@@ -38,7 +38,7 @@ struct IOContext;
 class BrokerFileReader final : public FileReader {
 public:
     BrokerFileReader(const TNetworkAddress& broker_addr, Path path, size_t 
file_size, TBrokerFD fd,
-                     std::shared_ptr<BrokerServiceConnection> connection);
+                     std::shared_ptr<BrokerServiceConnection> connection, 
int64_t mtime = 0);
 
     ~BrokerFileReader() override;
 
@@ -50,6 +50,8 @@ public:
 
     bool closed() const override { return 
_closed.load(std::memory_order_acquire); }
 
+    int64_t mtime() const override { return _mtime; }
+
 protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
@@ -62,6 +64,7 @@ private:
     TBrokerFD _fd;
 
     std::shared_ptr<BrokerServiceConnection> _connection;
+    int64_t _mtime;
     std::atomic<bool> _closed = false;
 };
 } // namespace doris::io
diff --git a/be/src/io/fs/broker_file_system.cpp 
b/be/src/io/fs/broker_file_system.cpp
index 8b0d5db23e2..b0dc89dc277 100644
--- a/be/src/io/fs/broker_file_system.cpp
+++ b/be/src/io/fs/broker_file_system.cpp
@@ -139,7 +139,7 @@ Status BrokerFileSystem::open_file_internal(const Path& 
file, FileReaderSPtr* re
                                error_msg(response->opStatus.message));
     }
     *reader = std::make_shared<BrokerFileReader>(_broker_addr, file, fsize, 
response->fd,
-                                                 _connection);
+                                                 _connection, opts.mtime);
     return Status::OK();
 }
 
diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index 5fe07176235..6ddcca02067 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -160,6 +160,8 @@ public:
 
     bool closed() const override { return _closed; }
 
+    int64_t mtime() const override { return _inner_reader->mtime(); }
+
 protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
@@ -329,6 +331,8 @@ public:
 
     bool closed() const override { return _closed; }
 
+    int64_t mtime() const override { return _reader->mtime(); }
+
     // for test only
     size_t buffer_remaining() const { return _remaining; }
 
@@ -532,6 +536,8 @@ public:
 
     bool closed() const override { return _closed; }
 
+    int64_t mtime() const override { return _reader->mtime(); }
+
     void set_random_access_ranges(const std::vector<PrefetchRange>* 
random_access_ranges) {
         _random_access_ranges = random_access_ranges;
         for (auto& _pre_buffer : _pre_buffers) {
@@ -592,6 +598,8 @@ public:
 
     bool closed() const override { return _closed; }
 
+    int64_t mtime() const override { return _reader->mtime(); }
+
 protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
@@ -626,6 +634,8 @@ public:
     virtual ~BufferedStreamReader() = default;
     // return the file path
     virtual std::string path() = 0;
+
+    virtual int64_t mtime() const = 0;
 };
 
 class BufferedFileStreamReader : public BufferedStreamReader, public 
ProfileCollector {
@@ -639,6 +649,8 @@ public:
     Status read_bytes(Slice& slice, uint64_t offset, const IOContext* io_ctx) 
override;
     std::string path() override { return _file->path(); }
 
+    int64_t mtime() const override { return _file->mtime(); }
+
 protected:
     void _collect_profile_before_close() override {
         if (_file != nullptr) {
diff --git a/be/src/io/fs/file_reader.h b/be/src/io/fs/file_reader.h
index e6d8527e831..3df912cbad4 100644
--- a/be/src/io/fs/file_reader.h
+++ b/be/src/io/fs/file_reader.h
@@ -90,6 +90,9 @@ public:
 
     virtual const std::string& get_data_dir_path() { return 
VIRTUAL_REMOTE_DATA_DIR; }
 
+    // File modification time (seconds since epoch). Default to 0 meaning 
unknown.
+    virtual int64_t mtime() const = 0;
+
 protected:
     virtual Status read_at_impl(size_t offset, Slice result, size_t* 
bytes_read,
                                 const IOContext* io_ctx) = 0;
diff --git a/be/src/io/fs/hdfs_file_reader.cpp 
b/be/src/io/fs/hdfs_file_reader.cpp
index 0e278dff0c8..b1d65a63ba0 100644
--- a/be/src/io/fs/hdfs_file_reader.cpp
+++ b/be/src/io/fs/hdfs_file_reader.cpp
@@ -66,16 +66,17 @@ Result<FileReaderSPtr> HdfsFileReader::create(Path 
full_path, const hdfsFS& fs,
     auto path = convert_path(full_path, fs_name);
     return get_file(fs, path, opts.mtime, opts.file_size).transform([&](auto&& 
accessor) {
         return std::make_shared<HdfsFileReader>(std::move(path), 
std::move(fs_name),
-                                                std::move(accessor), profile);
+                                                std::move(accessor), profile, 
opts.mtime);
     });
 }
 
 HdfsFileReader::HdfsFileReader(Path path, std::string fs_name, 
FileHandleCache::Accessor accessor,
-                               RuntimeProfile* profile)
+                               RuntimeProfile* profile, int64_t mtime)
         : _path(std::move(path)),
           _fs_name(std::move(fs_name)),
           _accessor(std::move(accessor)),
-          _profile(profile) {
+          _profile(profile),
+          _mtime(mtime) {
     _handle = _accessor.get();
 
     DorisMetrics::instance()->hdfs_file_open_reading->increment(1);
diff --git a/be/src/io/fs/hdfs_file_reader.h b/be/src/io/fs/hdfs_file_reader.h
index 8556eea0de6..08f98bca29a 100644
--- a/be/src/io/fs/hdfs_file_reader.h
+++ b/be/src/io/fs/hdfs_file_reader.h
@@ -45,7 +45,7 @@ public:
                                          const FileReaderOptions& opts, 
RuntimeProfile* profile);
 
     HdfsFileReader(Path path, std::string fs_name, FileHandleCache::Accessor 
accessor,
-                   RuntimeProfile* profile);
+                   RuntimeProfile* profile, int64_t mtime = 0);
 
     ~HdfsFileReader() override;
 
@@ -57,6 +57,8 @@ public:
 
     bool closed() const override { return 
_closed.load(std::memory_order_acquire); }
 
+    int64_t mtime() const override { return _mtime; }
+
 protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
@@ -86,6 +88,7 @@ private:
     CachedHdfsFileHandle* _handle = nullptr; // owned by _cached_file_handle
     std::atomic<bool> _closed = false;
     RuntimeProfile* _profile = nullptr;
+    int64_t _mtime;
 #ifdef USE_HADOOP_HDFS
     HDFSProfile _hdfs_profile;
 #endif
diff --git a/be/src/io/fs/http_file_reader.cpp 
b/be/src/io/fs/http_file_reader.cpp
index fb243179baf..5ad984039fc 100644
--- a/be/src/io/fs/http_file_reader.cpp
+++ b/be/src/io/fs/http_file_reader.cpp
@@ -34,7 +34,7 @@ Result<FileReaderSPtr> HttpFileReader::create(const 
std::string& url,
     ofi.path = Path(url);
     ofi.extend_info = props;
 
-    auto reader = std::make_shared<HttpFileReader>(ofi, url);
+    auto reader = std::make_shared<HttpFileReader>(ofi, url, opts.mtime);
 
     // Open the file to detect Range support and validate configuration
     RETURN_IF_ERROR_RESULT(reader->open(opts));
@@ -42,11 +42,12 @@ Result<FileReaderSPtr> HttpFileReader::create(const 
std::string& url,
     return reader;
 }
 
-HttpFileReader::HttpFileReader(const OpenFileInfo& fileInfo, std::string url)
+HttpFileReader::HttpFileReader(const OpenFileInfo& fileInfo, std::string url, 
int64_t mtime)
         : _extend_kv(fileInfo.extend_info),
           _path(fileInfo.path),
           _url(std::move(url)),
-          _client(std::make_unique<HttpClient>()) {
+          _client(std::make_unique<HttpClient>()),
+          _mtime(mtime) {
     auto etag_iter = _extend_kv.find("etag");
     if (etag_iter != _extend_kv.end()) {
         _etag = etag_iter->second;
diff --git a/be/src/io/fs/http_file_reader.h b/be/src/io/fs/http_file_reader.h
index 607eedf3d1a..982e65905aa 100644
--- a/be/src/io/fs/http_file_reader.h
+++ b/be/src/io/fs/http_file_reader.h
@@ -41,7 +41,7 @@ public:
                                          const std::map<std::string, 
std::string>& props,
                                          const FileReaderOptions& opts, 
RuntimeProfile* profile);
 
-    explicit HttpFileReader(const OpenFileInfo& fileInfo, std::string url);
+    explicit HttpFileReader(const OpenFileInfo& fileInfo, std::string url, 
int64_t mtime);
     ~HttpFileReader() override;
 
     Status open(const FileReaderOptions& opts);
@@ -52,6 +52,8 @@ public:
     bool closed() const override { return 
_closed.load(std::memory_order_acquire); }
     size_t size() const override { return _file_size; }
 
+    int64_t mtime() const override { return _mtime; }
+
 private:
     // Prepare and initialize the HTTP client for a new request
     Status prepare_client(bool set_fail_on_error = true);
@@ -78,6 +80,7 @@ private:
     int64_t _last_modified = 0;
     std::atomic<bool> _closed = false;
     std::unique_ptr<HttpClient> _client;
+    int64_t _mtime;
 
     // Configuration for non-Range request handling
     bool _enable_range_request = true;                         // Whether 
Range request is required
diff --git a/be/src/io/fs/http_file_system.cpp 
b/be/src/io/fs/http_file_system.cpp
index 92e175ca774..b1e8de354ad 100644
--- a/be/src/io/fs/http_file_system.cpp
+++ b/be/src/io/fs/http_file_system.cpp
@@ -56,7 +56,7 @@ Status HttpFileSystem::open_file_internal(const Path& path, 
FileReaderSPtr* read
     // Pass properties (including HTTP headers) to the file reader
     file_info.extend_info = _properties;
 
-    auto http_reader = std::make_shared<HttpFileReader>(file_info, 
path.native());
+    auto http_reader = std::make_shared<HttpFileReader>(file_info, 
path.native(), opts.mtime);
     RETURN_IF_ERROR(http_reader->open(opts));
     *reader = http_reader;
     return Status::OK();
diff --git a/be/src/io/fs/local_file_reader.h b/be/src/io/fs/local_file_reader.h
index 0ffd6ccde9e..97e071226a7 100644
--- a/be/src/io/fs/local_file_reader.h
+++ b/be/src/io/fs/local_file_reader.h
@@ -62,6 +62,8 @@ public:
 
     const std::string& get_data_dir_path() override { return _data_dir_path; }
 
+    int64_t mtime() const override { return 0; }
+
 private:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
diff --git a/be/src/io/fs/s3_file_reader.h b/be/src/io/fs/s3_file_reader.h
index 58294ec1891..40e3ac61d3a 100644
--- a/be/src/io/fs/s3_file_reader.h
+++ b/be/src/io/fs/s3_file_reader.h
@@ -53,6 +53,8 @@ public:
 
     bool closed() const override { return 
_closed.load(std::memory_order_acquire); }
 
+    int64_t mtime() const override { return 0; }
+
 protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
diff --git a/be/src/io/fs/stream_load_pipe.h b/be/src/io/fs/stream_load_pipe.h
index cedab0b6c17..df137a9267c 100644
--- a/be/src/io/fs/stream_load_pipe.h
+++ b/be/src/io/fs/stream_load_pipe.h
@@ -57,6 +57,8 @@ public:
 
     size_t size() const override { return 0; }
 
+    int64_t mtime() const override { return 0; }
+
     // called when consumer finished
     Status close() override {
         if (!(_finished || _cancelled)) {
diff --git a/be/src/io/fs/tracing_file_reader.h 
b/be/src/io/fs/tracing_file_reader.h
index 39b70dfbb63..7a6651afd21 100644
--- a/be/src/io/fs/tracing_file_reader.h
+++ b/be/src/io/fs/tracing_file_reader.h
@@ -47,6 +47,8 @@ public:
     void _collect_profile_at_runtime() override { return 
_inner->collect_profile_at_runtime(); }
     void _collect_profile_before_close() override { return 
_inner->collect_profile_before_close(); }
 
+    int64_t mtime() const override { return _inner->mtime(); }
+
     FileReaderStats* stats() const { return _stats; }
     doris::io::FileReaderSPtr inner_reader() { return _inner; }
 
diff --git a/be/src/vec/exec/format/orc/orc_file_reader.h 
b/be/src/vec/exec/format/orc/orc_file_reader.h
index 503777e67c2..15aeed33242 100644
--- a/be/src/vec/exec/format/orc/orc_file_reader.h
+++ b/be/src/vec/exec/format/orc/orc_file_reader.h
@@ -54,6 +54,8 @@ public:
 
     bool closed() const override { return _closed; }
 
+    int64_t mtime() const override { return _inner_reader->mtime(); }
+
     // for test only
     const Statistics& statistics() const { return _statistics; }
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
index a474b76a518..807975def2f 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
@@ -25,6 +25,8 @@
 #include <utility>
 
 #include "common/compiler_util.h" // IWYU pragma: keep
+#include "io/fs/buffered_reader.h"
+#include "olap/page_cache.h"
 #include "util/bit_util.h"
 #include "util/block_compression.h"
 #include "util/runtime_profile.h"
@@ -51,7 +53,7 @@ template <bool IN_COLLECTION, bool OFFSET_INDEX>
 ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::ColumnChunkReader(
         io::BufferedStreamReader* reader, tparquet::ColumnChunk* column_chunk,
         FieldSchema* field_schema, const tparquet::OffsetIndex* offset_index, 
size_t total_rows,
-        io::IOContext* io_ctx)
+        io::IOContext* io_ctx, const ParquetPageReadContext& ctx)
         : _field_schema(field_schema),
           _max_rep_level(field_schema->repetition_level),
           _max_def_level(field_schema->definition_level),
@@ -59,7 +61,8 @@ ColumnChunkReader<IN_COLLECTION, 
OFFSET_INDEX>::ColumnChunkReader(
           _metadata(column_chunk->meta_data),
           _offset_index(offset_index),
           _total_rows(total_rows),
-          _io_ctx(io_ctx) {}
+          _io_ctx(io_ctx),
+          _ctx(ctx) {}
 
 template <bool IN_COLLECTION, bool OFFSET_INDEX>
 Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::init() {
@@ -68,7 +71,8 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::init() 
{
     size_t chunk_size = _metadata.total_compressed_size;
     // create page reader
     _page_reader = create_page_reader<IN_COLLECTION, OFFSET_INDEX>(
-            _stream_reader, _io_ctx, start_offset, chunk_size, _total_rows, 
_offset_index);
+            _stream_reader, _io_ctx, start_offset, chunk_size, _total_rows, 
_metadata,
+            _offset_index, _ctx);
     // get the block compression codec
     RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, 
&_block_compress_codec));
     _state = INITIALIZED;
@@ -103,7 +107,7 @@ Status ColumnChunkReader<IN_COLLECTION, 
OFFSET_INDEX>::_parse_first_page_header(
     RETURN_IF_ERROR(parse_page_header());
 
     const tparquet::PageHeader* header = nullptr;
-    RETURN_IF_ERROR(_page_reader->get_page_header(header));
+    RETURN_IF_ERROR(_page_reader->get_page_header(&header));
     if (header->type == tparquet::PageType::DICTIONARY_PAGE) {
         // the first page maybe directory page even if 
_metadata.__isset.dictionary_page_offset == false,
         // so we should parse the directory page in next_page()
@@ -124,8 +128,7 @@ Status ColumnChunkReader<IN_COLLECTION, 
OFFSET_INDEX>::parse_page_header() {
     RETURN_IF_ERROR(_page_reader->parse_page_header());
 
     const tparquet::PageHeader* header = nullptr;
-    ;
-    RETURN_IF_ERROR(_page_reader->get_page_header(header));
+    RETURN_IF_ERROR(_page_reader->get_page_header(&header));
     int32_t page_num_values = _page_reader->is_header_v2() ? 
header->data_page_header_v2.num_values
                                                            : 
header->data_page_header.num_values;
     _remaining_rep_nums = page_num_values;
@@ -168,37 +171,151 @@ Status ColumnChunkReader<IN_COLLECTION, 
OFFSET_INDEX>::load_page_data() {
     }
 
     const tparquet::PageHeader* header = nullptr;
-    RETURN_IF_ERROR(_page_reader->get_page_header(header));
+    RETURN_IF_ERROR(_page_reader->get_page_header(&header));
     int32_t uncompressed_size = header->uncompressed_page_size;
+    bool page_loaded = false;
+
+    // First, try to reuse a cache handle previously discovered by PageReader
+    // (header-only lookup) to avoid a second lookup here. If no handle is
+    // attached, fall back to a StoragePageCache lookup for a decompressed 
page.
+    if (_ctx.enable_parquet_file_page_cache && 
!config::disable_storage_page_cache &&
+        StoragePageCache::instance() != nullptr) {
+        if (_page_reader->has_page_cache_handle()) {
+            const PageCacheHandle& handle = _page_reader->page_cache_handle();
+            Slice cached = handle.data();
+            size_t header_size = _page_reader->header_bytes().size();
+            //size_t levels_size = 0;
+            size_t levels_size = 0;
+            if (header->__isset.data_page_header_v2) {
+                const tparquet::DataPageHeaderV2& header_v2 = 
header->data_page_header_v2;
+                size_t rl = header_v2.repetition_levels_byte_length;
+                size_t dl = header_v2.definition_levels_byte_length;
+                levels_size = rl + dl;
+                _v2_rep_levels =
+                        Slice(reinterpret_cast<const uint8_t*>(cached.data) + 
header_size, rl);
+                _v2_def_levels =
+                        Slice(reinterpret_cast<const uint8_t*>(cached.data) + 
header_size + rl, dl);
+            }
+            // payload_slice points to the bytes after header and levels
+            Slice payload_slice(cached.data + header_size + levels_size,
+                                cached.size - header_size - levels_size);
 
-    if (_block_compress_codec != nullptr) {
-        Slice compressed_data;
-        RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data));
-        if (header->__isset.data_page_header_v2) {
-            const tparquet::DataPageHeaderV2& header_v2 = 
header->data_page_header_v2;
-            // uncompressed_size = rl + dl + uncompressed_data_size
-            // compressed_size = rl + dl + compressed_data_size
-            uncompressed_size -= header_v2.repetition_levels_byte_length +
-                                 header_v2.definition_levels_byte_length;
-            _get_uncompressed_levels(header_v2, compressed_data);
+            bool cache_payload_is_decompressed = 
_page_reader->is_cache_payload_decompressed();
+
+            if (cache_payload_is_decompressed) {
+                // Cached payload is already uncompressed
+                _page_data = payload_slice;
+            } else {
+                CHECK(_block_compress_codec);
+                // Decompress cached payload into _decompress_buf for decoding
+                size_t uncompressed_payload_size =
+                        header->__isset.data_page_header_v2
+                                ? 
static_cast<size_t>(header->uncompressed_page_size) - levels_size
+                                : 
static_cast<size_t>(header->uncompressed_page_size);
+                _reserve_decompress_buf(uncompressed_payload_size);
+                _page_data = Slice(_decompress_buf.get(), 
uncompressed_payload_size);
+                SCOPED_RAW_TIMER(&_chunk_statistics.decompress_time);
+                _chunk_statistics.decompress_cnt++;
+                
RETURN_IF_ERROR(_block_compress_codec->decompress(payload_slice, &_page_data));
+            }
+            // page cache counters were incremented when PageReader did the 
header-only
+            // cache lookup. Do not increment again to avoid double-counting.
+            page_loaded = true;
         }
-        bool is_v2_compressed =
-                header->__isset.data_page_header_v2 && 
header->data_page_header_v2.is_compressed;
-        if (header->__isset.data_page_header || is_v2_compressed) {
-            // check decompressed buffer size
-            _reserve_decompress_buf(uncompressed_size);
-            _page_data = Slice(_decompress_buf.get(), uncompressed_size);
-            SCOPED_RAW_TIMER(&_chunk_statistics.decompress_time);
-            _chunk_statistics.decompress_cnt++;
-            RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, 
&_page_data));
+    }
+
+    if (!page_loaded) {
+        if (_block_compress_codec != nullptr) {
+            Slice compressed_data;
+            RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data));
+            std::vector<uint8_t> level_bytes;
+            if (header->__isset.data_page_header_v2) {
+                const tparquet::DataPageHeaderV2& header_v2 = 
header->data_page_header_v2;
+                // uncompressed_size = rl + dl + uncompressed_data_size
+                // compressed_size = rl + dl + compressed_data_size
+                uncompressed_size -= header_v2.repetition_levels_byte_length +
+                                     header_v2.definition_levels_byte_length;
+                // copy level bytes (rl + dl) so that we can cache header + 
levels + uncompressed payload
+                size_t rl = header_v2.repetition_levels_byte_length;
+                size_t dl = header_v2.definition_levels_byte_length;
+                size_t level_sz = rl + dl;
+                if (level_sz > 0) {
+                    level_bytes.resize(level_sz);
+                    memcpy(level_bytes.data(), compressed_data.data, level_sz);
+                }
+                // now remove levels from compressed_data for decompression
+                _get_uncompressed_levels(header_v2, compressed_data);
+            }
+            bool is_v2_compressed = header->__isset.data_page_header_v2 &&
+                                    header->data_page_header_v2.is_compressed;
+            bool page_has_compression = header->__isset.data_page_header || 
is_v2_compressed;
+
+            if (page_has_compression) {
+                // Decompress payload for immediate decoding
+                _reserve_decompress_buf(uncompressed_size);
+                _page_data = Slice(_decompress_buf.get(), uncompressed_size);
+                SCOPED_RAW_TIMER(&_chunk_statistics.decompress_time);
+                _chunk_statistics.decompress_cnt++;
+                
RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, 
&_page_data));
+
+                // Decide whether to cache decompressed payload or compressed 
payload based on threshold
+                bool should_cache_decompressed = false;
+                if (header->compressed_page_size > 0) {
+                    should_cache_decompressed =
+                            (_metadata.codec == 
tparquet::CompressionCodec::UNCOMPRESSED) ||
+                            
(static_cast<double>(header->uncompressed_page_size) <=
+                             
static_cast<double>(config::parquet_page_cache_decompress_threshold) *
+                                     
static_cast<double>(header->compressed_page_size));
+                }
+
+                if (_ctx.enable_parquet_file_page_cache && 
!config::disable_storage_page_cache &&
+                    StoragePageCache::instance() != nullptr &&
+                    !_page_reader->header_bytes().empty()) {
+                    if (should_cache_decompressed) {
+                        _insert_page_into_cache(level_bytes, _page_data);
+                        
_chunk_statistics.page_cache_decompressed_write_counter += 1;
+                    } else {
+                        if (config::enable_parquet_cache_compressed_pages) {
+                            // cache the compressed payload as-is (header | 
levels | compressed_payload)
+                            _insert_page_into_cache(
+                                    level_bytes, Slice(compressed_data.data, 
compressed_data.size));
+                            
_chunk_statistics.page_cache_compressed_write_counter += 1;
+                        }
+                    }
+                }
+            } else {
+                // no compression on this page, use the data directly
+                _page_data = Slice(compressed_data.data, compressed_data.size);
+                if (_ctx.enable_parquet_file_page_cache && 
!config::disable_storage_page_cache &&
+                    StoragePageCache::instance() != nullptr) {
+                    _insert_page_into_cache(level_bytes, _page_data);
+                    _chunk_statistics.page_cache_decompressed_write_counter += 
1;
+                }
+            }
         } else {
-            // Don't need decompress
-            _page_data = Slice(compressed_data.data, compressed_data.size);
-        }
-    } else {
-        RETURN_IF_ERROR(_page_reader->get_page_data(_page_data));
-        if (header->__isset.data_page_header_v2) {
-            _get_uncompressed_levels(header->data_page_header_v2, _page_data);
+            // For uncompressed page, we may still need to extract v2 levels
+            std::vector<uint8_t> level_bytes;
+            Slice uncompressed_data;
+            RETURN_IF_ERROR(_page_reader->get_page_data(uncompressed_data));
+            if (header->__isset.data_page_header_v2) {
+                const tparquet::DataPageHeaderV2& header_v2 = 
header->data_page_header_v2;
+                size_t rl = header_v2.repetition_levels_byte_length;
+                size_t dl = header_v2.definition_levels_byte_length;
+                size_t level_sz = rl + dl;
+                if (level_sz > 0) {
+                    level_bytes.resize(level_sz);
+                    memcpy(level_bytes.data(), uncompressed_data.data, 
level_sz);
+                }
+                _get_uncompressed_levels(header_v2, uncompressed_data);
+            }
+            // copy page data out
+            _page_data = Slice(uncompressed_data.data, uncompressed_data.size);
+            // Optionally cache uncompressed data for uncompressed pages
+            if (_ctx.enable_parquet_file_page_cache && 
!config::disable_storage_page_cache &&
+                StoragePageCache::instance() != nullptr) {
+                _insert_page_into_cache(level_bytes, _page_data);
+                _chunk_statistics.page_cache_decompressed_write_counter += 1;
+            }
         }
     }
 
@@ -243,7 +360,15 @@ Status ColumnChunkReader<IN_COLLECTION, 
OFFSET_INDEX>::load_page_data() {
         _decoders[static_cast<int>(encoding)] = std::move(page_decoder);
         _page_decoder = _decoders[static_cast<int>(encoding)].get();
     }
-    // Reset page data for each page
+    // Reset page data for each page.
+    // If this is a v2 data page, _page_data currently contains rl+dl followed 
by payload.
+    // The decoder expects payload-only, so strip the level bytes into a 
temporary Slice
+    // that points into the same cached memory (so ownership remains with the 
cache handle).
+    // Slice payload_slice = _page_data;
+    // if (header->__isset.data_page_header_v2) {
+    //     const tparquet::DataPageHeaderV2& header_v2 = 
header->data_page_header_v2;
+    //     _get_uncompressed_levels(header_v2, payload_slice);
+    // }
     RETURN_IF_ERROR(_page_decoder->set_data(&_page_data));
 
     _state = DATA_LOADED;
@@ -253,7 +378,7 @@ Status ColumnChunkReader<IN_COLLECTION, 
OFFSET_INDEX>::load_page_data() {
 template <bool IN_COLLECTION, bool OFFSET_INDEX>
 Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::_decode_dict_page() {
     const tparquet::PageHeader* header = nullptr;
-    RETURN_IF_ERROR(_page_reader->get_page_header(header));
+    RETURN_IF_ERROR(_page_reader->get_page_header(&header));
     DCHECK_EQ(tparquet::PageType::DICTIONARY_PAGE, header->type);
     SCOPED_RAW_TIMER(&_chunk_statistics.decode_dict_time);
 
@@ -270,16 +395,88 @@ Status ColumnChunkReader<IN_COLLECTION, 
OFFSET_INDEX>::_decode_dict_page() {
     // Prepare dictionary data
     int32_t uncompressed_size = header->uncompressed_page_size;
     auto dict_data = make_unique_buffer<uint8_t>(uncompressed_size);
-    if (_block_compress_codec != nullptr) {
-        Slice compressed_data;
-        RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data));
-        Slice dict_slice(dict_data.get(), uncompressed_size);
-        RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, 
&dict_slice));
-    } else {
-        Slice dict_slice;
-        RETURN_IF_ERROR(_page_reader->get_page_data(dict_slice));
-        // The data is stored by BufferedStreamReader, we should copy it out
-        memcpy(dict_data.get(), dict_slice.data, dict_slice.size);
+    bool dict_loaded = false;
+
+    // Try to load dictionary page from cache
+    if (_ctx.enable_parquet_file_page_cache && 
!config::disable_storage_page_cache &&
+        StoragePageCache::instance() != nullptr) {
+        if (_page_reader->has_page_cache_handle()) {
+            const PageCacheHandle& handle = _page_reader->page_cache_handle();
+            Slice cached = handle.data();
+            size_t header_size = _page_reader->header_bytes().size();
+            // Dictionary page layout in cache: header | payload (compressed 
or uncompressed)
+            Slice payload_slice(cached.data + header_size, cached.size - 
header_size);
+
+            bool cache_payload_is_decompressed = 
_page_reader->is_cache_payload_decompressed();
+
+            if (cache_payload_is_decompressed) {
+                // Use cached decompressed dictionary data
+                memcpy(dict_data.get(), payload_slice.data, 
payload_slice.size);
+                dict_loaded = true;
+            } else {
+                CHECK(_block_compress_codec);
+                // Decompress cached compressed dictionary data
+                Slice dict_slice(dict_data.get(), uncompressed_size);
+                
RETURN_IF_ERROR(_block_compress_codec->decompress(payload_slice, &dict_slice));
+                dict_loaded = true;
+            }
+
+            // When dictionary page is loaded from cache, we need to skip the 
page data
+            // to update the offset correctly (similar to calling 
get_page_data())
+            if (dict_loaded) {
+                _page_reader->skip_page_data();
+            }
+        }
+    }
+
+    if (!dict_loaded) {
+        // Load and decompress dictionary page from file
+        if (_block_compress_codec != nullptr) {
+            Slice compressed_data;
+            RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data));
+            Slice dict_slice(dict_data.get(), uncompressed_size);
+            RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, 
&dict_slice));
+
+            // Decide whether to cache decompressed or compressed dictionary 
based on threshold
+            bool should_cache_decompressed = false;
+            if (header->compressed_page_size > 0) {
+                should_cache_decompressed =
+                        (static_cast<double>(header->uncompressed_page_size) <=
+                         
static_cast<double>(config::parquet_page_cache_decompress_threshold) *
+                                 
static_cast<double>(header->compressed_page_size));
+            }
+
+            if (_ctx.enable_parquet_file_page_cache && 
!config::disable_storage_page_cache &&
+                StoragePageCache::instance() != nullptr && 
!_page_reader->header_bytes().empty()) {
+                std::vector<uint8_t> empty_levels; // Dictionary pages don't 
have levels
+                if (should_cache_decompressed) {
+                    // Cache the decompressed dictionary page
+                    _insert_page_into_cache(empty_levels, dict_slice);
+                    _chunk_statistics.page_cache_decompressed_write_counter += 
1;
+                } else {
+                    if (config::enable_parquet_cache_compressed_pages) {
+                        // Cache the compressed dictionary page
+                        _insert_page_into_cache(empty_levels,
+                                                Slice(compressed_data.data, 
compressed_data.size));
+                        _chunk_statistics.page_cache_compressed_write_counter 
+= 1;
+                    }
+                }
+            }
+        } else {
+            Slice dict_slice;
+            RETURN_IF_ERROR(_page_reader->get_page_data(dict_slice));
+            // The data is stored by BufferedStreamReader, we should copy it 
out
+            memcpy(dict_data.get(), dict_slice.data, dict_slice.size);
+
+            // Cache the uncompressed dictionary page
+            if (_ctx.enable_parquet_file_page_cache && 
!config::disable_storage_page_cache &&
+                StoragePageCache::instance() != nullptr && 
!_page_reader->header_bytes().empty()) {
+                std::vector<uint8_t> empty_levels;
+                Slice payload(dict_data.get(), uncompressed_size);
+                _insert_page_into_cache(empty_levels, payload);
+                _chunk_statistics.page_cache_decompressed_write_counter += 1;
+            }
+        }
     }
 
     // Cache page decoder
@@ -305,6 +502,32 @@ void ColumnChunkReader<IN_COLLECTION, 
OFFSET_INDEX>::_reserve_decompress_buf(siz
     }
 }
 
+template <bool IN_COLLECTION, bool OFFSET_INDEX>
+void ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::_insert_page_into_cache(
+        const std::vector<uint8_t>& level_bytes, const Slice& payload) {
+    StoragePageCache::CacheKey key(
+            fmt::format("{}::{}", _stream_reader->path(), 
_stream_reader->mtime()),
+            _page_reader->file_end_offset(), 
_page_reader->header_start_offset());
+    const std::vector<uint8_t>& header_bytes = _page_reader->header_bytes();
+    size_t total = header_bytes.size() + level_bytes.size() + payload.size;
+    auto* page = new DataPage(total, true, segment_v2::DATA_PAGE);
+    size_t pos = 0;
+    memcpy(page->data() + pos, header_bytes.data(), header_bytes.size());
+    pos += header_bytes.size();
+    if (!level_bytes.empty()) {
+        memcpy(page->data() + pos, level_bytes.data(), level_bytes.size());
+        pos += level_bytes.size();
+    }
+    if (payload.size > 0) {
+        memcpy(page->data() + pos, payload.data, payload.size);
+        pos += payload.size;
+    }
+    page->reset_size(total);
+    PageCacheHandle handle;
+    StoragePageCache::instance()->insert(key, page, &handle, 
segment_v2::DATA_PAGE);
+    _chunk_statistics.page_cache_write_counter += 1;
+}
+
 template <bool IN_COLLECTION, bool OFFSET_INDEX>
 Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::skip_values(size_t 
num_values,
                                                                    bool 
skip_data) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
index 93d0f92ecec..c380e99e087 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
@@ -61,6 +61,18 @@ struct ColumnChunkReaderStatistics {
     int64_t skip_page_header_num = 0;
     int64_t parse_page_header_num = 0;
     int64_t read_page_header_time = 0;
+    // page cache metrics
+    // total pages read (from cache or file)
+    int64_t page_read_counter = 0;
+    int64_t page_cache_write_counter = 0;
+    int64_t page_cache_compressed_write_counter = 0;
+    int64_t page_cache_decompressed_write_counter = 0;
+    // number of cache hits (either compressed or decompressed)
+    int64_t page_cache_hit_counter = 0;
+    int64_t page_cache_missing_counter = 0;
+    // per-hit breakdown
+    int64_t page_cache_compressed_hit_counter = 0;
+    int64_t page_cache_decompressed_hit_counter = 0;
 };
 
 /**
@@ -74,6 +86,17 @@ struct ColumnChunkReaderStatistics {
         int64_t decode_level_time = 0;
         int64_t skip_page_header_num = 0;
         int64_t parse_page_header_num = 0;
+        // page cache metrics
+        // total pages read (from cache or file)
+        int64_t page_read_counter = 0;
+        int64_t page_cache_write_counter = 0;
+        int64_t page_cache_compressed_write_counter = 0;
+        int64_t page_cache_decompressed_write_counter = 0;
+        // number of cache hits (either compressed or decompressed)
+        int64_t page_cache_hit_counter = 0;
+        // per-hit breakdown
+        int64_t page_cache_compressed_hit_counter = 0;
+        int64_t page_cache_decompressed_hit_counter = 0;
     };
  * // Create chunk reader
  * ColumnChunkReader chunk_reader(BufferedStreamReader* reader,
@@ -109,7 +132,8 @@ public:
 
     ColumnChunkReader(io::BufferedStreamReader* reader, tparquet::ColumnChunk* 
column_chunk,
                       FieldSchema* field_schema, const tparquet::OffsetIndex* 
offset_index,
-                      size_t total_row, io::IOContext* io_ctx);
+                      size_t total_row, io::IOContext* io_ctx,
+                      const ParquetPageReadContext& ctx = 
ParquetPageReadContext());
     ~ColumnChunkReader() = default;
 
     // Initialize chunk reader, will generate the decoder and codec.
@@ -167,6 +191,21 @@ public:
                 _page_reader->page_statistics().parse_page_header_num;
         _chunk_statistics.read_page_header_time =
                 _page_reader->page_statistics().read_page_header_time;
+        _chunk_statistics.page_read_counter += 
_page_reader->page_statistics().page_read_counter;
+        _chunk_statistics.page_cache_write_counter +=
+                _page_reader->page_statistics().page_cache_write_counter;
+        _chunk_statistics.page_cache_compressed_write_counter +=
+                
_page_reader->page_statistics().page_cache_compressed_write_counter;
+        _chunk_statistics.page_cache_decompressed_write_counter +=
+                
_page_reader->page_statistics().page_cache_decompressed_write_counter;
+        _chunk_statistics.page_cache_hit_counter +=
+                _page_reader->page_statistics().page_cache_hit_counter;
+        _chunk_statistics.page_cache_missing_counter +=
+                _page_reader->page_statistics().page_cache_missing_counter;
+        _chunk_statistics.page_cache_compressed_hit_counter +=
+                
_page_reader->page_statistics().page_cache_compressed_hit_counter;
+        _chunk_statistics.page_cache_decompressed_hit_counter +=
+                
_page_reader->page_statistics().page_cache_decompressed_hit_counter;
         return _chunk_statistics;
     }
 
@@ -205,6 +244,12 @@ public:
                                  size_t* result_rows, bool* cross_page);
     Status load_cross_page_nested_row(std::vector<level_t>& rep_levels, bool* 
cross_page);
 
+    // Test helpers / accessors
+    Slice get_page_data() const { return _page_data; }
+    const Slice& v2_rep_levels() const { return _v2_rep_levels; }
+    const Slice& v2_def_levels() const { return _v2_def_levels; }
+    ColumnChunkReaderStatistics& statistics() { return chunk_statistics(); }
+
 private:
     enum ColumnChunkReaderState { NOT_INIT, INITIALIZED, HEADER_PARSED, 
DATA_LOADED, PAGE_SKIPPED };
 
@@ -214,6 +259,7 @@ private:
 
     void _reserve_decompress_buf(size_t size);
     int32_t _get_type_length();
+    void _insert_page_into_cache(const std::vector<uint8_t>& level_bytes, 
const Slice& payload);
 
     void _get_uncompressed_levels(const tparquet::DataPageHeaderV2& page_v2, 
Slice& page_data);
     Status _skip_nested_rows_in_page(size_t num_rows);
@@ -233,6 +279,9 @@ private:
     std::unique_ptr<PageReader<IN_COLLECTION, OFFSET_INDEX>> _page_reader;
     BlockCompressionCodec* _block_compress_codec = nullptr;
 
+    // Session-level parquet page cache options
+    ParquetPageReadContext _ctx;
+
     LevelDecoder _rep_level_decoder;
     LevelDecoder _def_level_decoder;
     size_t _chunk_parsed_values = 0;
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index db6042ce703..12c6573ea74 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -111,13 +111,14 @@ Status ParquetColumnReader::create(io::FileReaderSPtr 
file, FieldSchema* field,
                                    size_t max_buf_size,
                                    std::unordered_map<int, 
tparquet::OffsetIndex>& col_offsets,
                                    bool in_collection, const 
std::set<uint64_t>& column_ids,
-                                   const std::set<uint64_t>& 
filter_column_ids) {
+                                   const std::set<uint64_t>& 
filter_column_ids, RuntimeState* state,
+                                   const std::string& created_by) {
     size_t total_rows = row_group.num_rows;
     if (field->data_type->get_primitive_type() == TYPE_ARRAY) {
         std::unique_ptr<ParquetColumnReader> element_reader;
         RETURN_IF_ERROR(create(file, &field->children[0], row_group, 
row_ranges, ctz, io_ctx,
                                element_reader, max_buf_size, col_offsets, 
true, column_ids,
-                               filter_column_ids));
+                               filter_column_ids, state, created_by));
         //        element_reader->set_nested_column();
         auto array_reader = ArrayColumnReader::create_unique(row_ranges, 
total_rows, ctz, io_ctx);
         RETURN_IF_ERROR(array_reader->init(std::move(element_reader), field));
@@ -132,7 +133,7 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, 
FieldSchema* field,
             // Create key reader
             RETURN_IF_ERROR(create(file, &field->children[0], row_group, 
row_ranges, ctz, io_ctx,
                                    key_reader, max_buf_size, col_offsets, 
true, column_ids,
-                                   filter_column_ids));
+                                   filter_column_ids, state, created_by));
             //            key_reader->set_nested_column();
         } else {
             auto skip_reader = std::make_unique<SkipReadingReader>(row_ranges, 
total_rows, ctz,
@@ -145,7 +146,7 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, 
FieldSchema* field,
             // Create value reader
             RETURN_IF_ERROR(create(file, &field->children[1], row_group, 
row_ranges, ctz, io_ctx,
                                    value_reader, max_buf_size, col_offsets, 
true, column_ids,
-                                   filter_column_ids));
+                                   filter_column_ids, state, created_by));
             //            value_reader->set_nested_column();
         } else {
             auto skip_reader = std::make_unique<SkipReadingReader>(row_ranges, 
total_rows, ctz,
@@ -208,14 +209,14 @@ Status ParquetColumnReader::create(io::FileReaderSPtr 
file, FieldSchema* field,
                 auto scalar_reader = ScalarColumnReader<true, 
false>::create_unique(
                         row_ranges, total_rows, chunk, offset_index, ctz, 
io_ctx);
 
-                RETURN_IF_ERROR(scalar_reader->init(file, field, 
max_buf_size));
+                RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size, 
state, created_by));
                 scalar_reader->_filter_column_ids = filter_column_ids;
                 reader.reset(scalar_reader.release());
             } else {
                 auto scalar_reader = ScalarColumnReader<true, 
true>::create_unique(
                         row_ranges, total_rows, chunk, offset_index, ctz, 
io_ctx);
 
-                RETURN_IF_ERROR(scalar_reader->init(file, field, 
max_buf_size));
+                RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size, 
state, created_by));
                 scalar_reader->_filter_column_ids = filter_column_ids;
                 reader.reset(scalar_reader.release());
             }
@@ -224,14 +225,14 @@ Status ParquetColumnReader::create(io::FileReaderSPtr 
file, FieldSchema* field,
                 auto scalar_reader = ScalarColumnReader<false, 
false>::create_unique(
                         row_ranges, total_rows, chunk, offset_index, ctz, 
io_ctx);
 
-                RETURN_IF_ERROR(scalar_reader->init(file, field, 
max_buf_size));
+                RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size, 
state, created_by));
                 scalar_reader->_filter_column_ids = filter_column_ids;
                 reader.reset(scalar_reader.release());
             } else {
                 auto scalar_reader = ScalarColumnReader<false, 
true>::create_unique(
                         row_ranges, total_rows, chunk, offset_index, ctz, 
io_ctx);
 
-                RETURN_IF_ERROR(scalar_reader->init(file, field, 
max_buf_size));
+                RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size, 
state, created_by));
                 scalar_reader->_filter_column_ids = filter_column_ids;
                 reader.reset(scalar_reader.release());
             }
@@ -249,7 +250,9 @@ void ParquetColumnReader::_generate_read_ranges(RowRange 
page_row_range,
 template <bool IN_COLLECTION, bool OFFSET_INDEX>
 Status ScalarColumnReader<IN_COLLECTION, 
OFFSET_INDEX>::init(io::FileReaderSPtr file,
                                                              FieldSchema* 
field,
-                                                             size_t 
max_buf_size) {
+                                                             size_t 
max_buf_size,
+                                                             RuntimeState* 
state,
+                                                             const 
std::string& created_by) {
     _field_schema = field;
     auto& chunk_meta = _chunk_meta.meta_data;
     int64_t chunk_start = has_dict_page(chunk_meta) ? 
chunk_meta.dictionary_page_offset
@@ -265,8 +268,13 @@ Status ScalarColumnReader<IN_COLLECTION, 
OFFSET_INDEX>::init(io::FileReaderSPtr
     }
     _stream_reader = std::make_unique<io::BufferedFileStreamReader>(file, 
chunk_start, chunk_len,
                                                                     
prefetch_buffer_size);
+    // Build Parquet page read context: enable_parquet_file_page_cache from 
session, others from BE config
+    ParquetPageReadContext ctx(
+            (state == nullptr) ? true : 
state->query_options().enable_parquet_file_page_cache,
+            created_by);
+
     _chunk_reader = std::make_unique<ColumnChunkReader<IN_COLLECTION, 
OFFSET_INDEX>>(
-            _stream_reader.get(), &_chunk_meta, field, _offset_index, 
_total_rows, _io_ctx);
+            _stream_reader.get(), &_chunk_meta, field, _offset_index, 
_total_rows, _io_ctx, ctx);
     RETURN_IF_ERROR(_chunk_reader->init());
     return Status::OK();
 }
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index 4a49473a69f..8bcd9712716 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -65,7 +65,15 @@ public:
                   decode_null_map_time(0),
                   skip_page_header_num(0),
                   parse_page_header_num(0),
-                  read_page_header_time(0) {}
+                  read_page_header_time(0),
+                  page_read_counter(0),
+                  page_cache_write_counter(0),
+                  page_cache_compressed_write_counter(0),
+                  page_cache_decompressed_write_counter(0),
+                  page_cache_hit_counter(0),
+                  page_cache_missing_counter(0),
+                  page_cache_compressed_hit_counter(0),
+                  page_cache_decompressed_hit_counter(0) {}
 
         ColumnStatistics(ColumnChunkReaderStatistics& cs, int64_t 
null_map_time)
                 : page_index_read_calls(0),
@@ -78,7 +86,15 @@ public:
                   decode_null_map_time(null_map_time),
                   skip_page_header_num(cs.skip_page_header_num),
                   parse_page_header_num(cs.parse_page_header_num),
-                  read_page_header_time(cs.read_page_header_time) {}
+                  read_page_header_time(cs.read_page_header_time),
+                  page_read_counter(cs.page_read_counter),
+                  page_cache_write_counter(cs.page_cache_write_counter),
+                  
page_cache_compressed_write_counter(cs.page_cache_compressed_write_counter),
+                  
page_cache_decompressed_write_counter(cs.page_cache_decompressed_write_counter),
+                  page_cache_hit_counter(cs.page_cache_hit_counter),
+                  page_cache_missing_counter(cs.page_cache_missing_counter),
+                  
page_cache_compressed_hit_counter(cs.page_cache_compressed_hit_counter),
+                  
page_cache_decompressed_hit_counter(cs.page_cache_decompressed_hit_counter) {}
 
         int64_t page_index_read_calls;
         int64_t decompress_time;
@@ -91,6 +107,14 @@ public:
         int64_t skip_page_header_num;
         int64_t parse_page_header_num;
         int64_t read_page_header_time;
+        int64_t page_read_counter;
+        int64_t page_cache_write_counter;
+        int64_t page_cache_compressed_write_counter;
+        int64_t page_cache_decompressed_write_counter;
+        int64_t page_cache_hit_counter;
+        int64_t page_cache_missing_counter;
+        int64_t page_cache_compressed_hit_counter;
+        int64_t page_cache_decompressed_hit_counter;
 
         void merge(ColumnStatistics& col_statistics) {
             page_index_read_calls += col_statistics.page_index_read_calls;
@@ -104,6 +128,17 @@ public:
             skip_page_header_num += col_statistics.skip_page_header_num;
             parse_page_header_num += col_statistics.parse_page_header_num;
             read_page_header_time += col_statistics.read_page_header_time;
+            page_read_counter += col_statistics.page_read_counter;
+            page_cache_write_counter += 
col_statistics.page_cache_write_counter;
+            page_cache_compressed_write_counter +=
+                    col_statistics.page_cache_compressed_write_counter;
+            page_cache_decompressed_write_counter +=
+                    col_statistics.page_cache_decompressed_write_counter;
+            page_cache_hit_counter += col_statistics.page_cache_hit_counter;
+            page_cache_missing_counter += 
col_statistics.page_cache_missing_counter;
+            page_cache_compressed_hit_counter += 
col_statistics.page_cache_compressed_hit_counter;
+            page_cache_decompressed_hit_counter +=
+                    col_statistics.page_cache_decompressed_hit_counter;
         }
     };
 
@@ -132,7 +167,8 @@ public:
                          std::unique_ptr<ParquetColumnReader>& reader, size_t 
max_buf_size,
                          std::unordered_map<int, tparquet::OffsetIndex>& 
col_offsets,
                          bool in_collection = false, const std::set<uint64_t>& 
column_ids = {},
-                         const std::set<uint64_t>& filter_column_ids = {});
+                         const std::set<uint64_t>& filter_column_ids = {},
+                         RuntimeState* state = nullptr, const std::string& 
created_by = "");
     virtual const std::vector<level_t>& get_rep_level() const = 0;
     virtual const std::vector<level_t>& get_def_level() const = 0;
     virtual ColumnStatistics column_statistics() = 0;
@@ -169,7 +205,8 @@ public:
               _chunk_meta(chunk_meta),
               _offset_index(offset_index) {}
     ~ScalarColumnReader() override { close(); }
-    Status init(io::FileReaderSPtr file, FieldSchema* field, size_t 
max_buf_size);
+    Status init(io::FileReaderSPtr file, FieldSchema* field, size_t 
max_buf_size,
+                RuntimeState* state, const std::string& created_by = "");
     Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type,
                             const 
std::shared_ptr<TableSchemaChangeHelper::Node>& root_node,
                             FilterMap& filter_map, size_t batch_size, size_t* 
read_rows, bool* eof,
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 31161f2c6ed..d57264c1987 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -85,7 +85,8 @@ RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader,
                                const PositionDeleteContext& 
position_delete_ctx,
                                const LazyReadContext& lazy_read_ctx, 
RuntimeState* state,
                                const std::set<uint64_t>& column_ids,
-                               const std::set<uint64_t>& filter_column_ids)
+                               const std::set<uint64_t>& filter_column_ids,
+                               const std::string& created_by)
         : _file_reader(file_reader),
           _read_table_columns(read_columns),
           _row_group_id(row_group_id),
@@ -98,7 +99,8 @@ RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader,
           _state(state),
           _obj_pool(new ObjectPool()),
           _column_ids(column_ids),
-          _filter_column_ids(filter_column_ids) {}
+          _filter_column_ids(filter_column_ids),
+          _created_by(created_by) {}
 
 RowGroupReader::~RowGroupReader() {
     _column_readers.clear();
@@ -131,9 +133,10 @@ Status RowGroupReader::init(
         auto read_file_col = 
_table_info_node_ptr->children_file_column_name(read_table_col);
         auto* field = schema.get_column(read_file_col);
         std::unique_ptr<ParquetColumnReader> reader;
-        RETURN_IF_ERROR(ParquetColumnReader::create(
-                _file_reader, field, _row_group_meta, _read_ranges, _ctz, 
_io_ctx, reader,
-                max_buf_size, col_offsets, false, _column_ids, 
_filter_column_ids));
+        RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field, 
_row_group_meta,
+                                                    _read_ranges, _ctz, 
_io_ctx, reader,
+                                                    max_buf_size, col_offsets, 
false, _column_ids,
+                                                    _filter_column_ids, 
_state, _created_by));
         if (reader == nullptr) {
             VLOG_DEBUG << "Init row group(" << _row_group_id << ") reader 
failed";
             return Status::Corruption("Init row group reader failed");
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index e9c4470af17..f504292c035 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -159,7 +159,7 @@ public:
                    const PositionDeleteContext& position_delete_ctx,
                    const LazyReadContext& lazy_read_ctx, RuntimeState* state,
                    const std::set<uint64_t>& column_ids,
-                   const std::set<uint64_t>& filter_column_ids);
+                   const std::set<uint64_t>& filter_column_ids, const 
std::string& created_by = "");
 
     ~RowGroupReader();
     Status init(const FieldDescriptor& schema, RowRanges& row_ranges,
@@ -261,6 +261,7 @@ private:
     std::shared_ptr<ObjectPool> _obj_pool;
     const std::set<uint64_t>& _column_ids;
     const std::set<uint64_t>& _filter_column_ids;
+    std::string _created_by; // Parquet file's created_by field
     bool _is_row_group_filtered = false;
 
     RowGroupIndex _current_row_group_idx {0, 0, 0};
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
index 3b6d7fdcb9b..38478ab2863 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
@@ -26,6 +26,8 @@
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/config.h"
 #include "io/fs/buffered_reader.h"
+#include "olap/page_cache.h"
+#include "parquet_common.h"
 #include "util/runtime_profile.h"
 #include "util/slice.h"
 #include "util/thrift_util.h"
@@ -40,18 +42,57 @@ namespace doris::vectorized {
 #include "common/compile_check_begin.h"
 static constexpr size_t INIT_PAGE_HEADER_SIZE = 128;
 
+// // Check if the file was created by a version that always marks pages as 
compressed
+// // regardless of the actual compression state (parquet-cpp < 2.0.0)
+// static bool _is_always_compressed(const ParquetPageReadContext& ctx) {
+//     if (ctx.created_by.empty()) {
+//         return false;
+//     }
+
+//     // Parse the version string
+//     std::unique_ptr<ParsedVersion> parsed_version;
+//     Status status = VersionParser::parse(ctx.created_by, &parsed_version);
+//     if (!status.ok()) {
+//         return false;
+//     }
+
+//     // Check if it's parquet-cpp
+//     if (parsed_version->application() != "parquet-cpp") {
+//         return false;
+//     }
+
+//     // Check if version < 2.0.0
+//     if (!parsed_version->version().has_value()) {
+//         return false;
+//     }
+
+//     std::unique_ptr<SemanticVersion> semantic_version;
+//     status = SemanticVersion::parse(parsed_version->version().value(), 
&semantic_version);
+//     if (!status.ok()) {
+//         return false;
+//     }
+
+//     // parquet-cpp versions < 2.0.0 always report compressed
+//     static const SemanticVersion PARQUET_CPP_FIXED_VERSION(2, 0, 0);
+//     return semantic_version->compare_to(PARQUET_CPP_FIXED_VERSION) < 0;
+// }
+
 template <bool IN_COLLECTION, bool OFFSET_INDEX>
 PageReader<IN_COLLECTION, OFFSET_INDEX>::PageReader(io::BufferedStreamReader* 
reader,
                                                     io::IOContext* io_ctx, 
uint64_t offset,
                                                     uint64_t length, size_t 
total_rows,
-                                                    const 
tparquet::OffsetIndex* offset_index)
+                                                    const 
tparquet::ColumnMetaData& metadata,
+                                                    const 
tparquet::OffsetIndex* offset_index,
+                                                    const 
ParquetPageReadContext& ctx)
         : _reader(reader),
           _io_ctx(io_ctx),
           _offset(offset),
           _start_offset(offset),
           _end_offset(offset + length),
           _total_rows(total_rows),
-          _offset_index(offset_index) {
+          _metadata(metadata),
+          _offset_index(offset_index),
+          _ctx(ctx) {
     _next_header_offset = _offset;
     _state = INITIALIZED;
 
@@ -77,11 +118,104 @@ Status PageReader<IN_COLLECTION, 
OFFSET_INDEX>::parse_page_header() {
         return Status::IOError("Should skip or load current page to get next 
page");
     }
 
+    _page_statistics.page_read_counter += 1;
+
+    // Parse page header from file; header bytes are saved for possible cache 
insertion
     const uint8_t* page_header_buf = nullptr;
     size_t max_size = _end_offset - _offset;
     size_t header_size = std::min(INIT_PAGE_HEADER_SIZE, max_size);
     const size_t MAX_PAGE_HEADER_SIZE = config::parquet_header_max_size_mb << 
20;
     uint32_t real_header_size = 0;
+
+    // Try a header-only lookup in the page cache. Cached pages store
+    // header + optional v2 levels + uncompressed payload, so we can
+    // parse the page header directly from the cached bytes and avoid
+    // a file read for the header.
+    if (_ctx.enable_parquet_file_page_cache && 
!config::disable_storage_page_cache &&
+        StoragePageCache::instance() != nullptr) {
+        PageCacheHandle handle;
+        StoragePageCache::CacheKey key(fmt::format("{}::{}", _reader->path(), 
_reader->mtime()),
+                                       _end_offset, _offset);
+        if (StoragePageCache::instance()->lookup(key, &handle, 
segment_v2::DATA_PAGE)) {
+            // Parse header directly from cached data
+            _page_cache_handle = std::move(handle);
+            Slice s = _page_cache_handle.data();
+            real_header_size = cast_set<uint32_t>(s.size);
+            SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
+            auto st = deserialize_thrift_msg(reinterpret_cast<const 
uint8_t*>(s.data),
+                                             &real_header_size, true, 
&_cur_page_header);
+            if (!st.ok()) return st;
+            // Increment page cache counters for a true cache hit on 
header+payload
+            _page_statistics.page_cache_hit_counter += 1;
+            // Detect whether the cached payload is compressed or decompressed 
and record
+            // the appropriate counter. Cached layout is: header | optional 
levels | payload
+
+            // Determine if payload is compressed using V2 is_compressed field 
if available
+            // bool payload_is_compressed = true;
+            // if (_cur_page_header.type == tparquet::PageType::DATA_PAGE_V2) {
+            //     const auto& page_header_v2 = 
_cur_page_header.data_page_header_v2;
+            //     if (page_header_v2.__isset.is_compressed) {
+            //         payload_is_compressed = page_header_v2.is_compressed;
+            //     }
+            // }
+
+            // // ARROW-17100: [C++][Parquet] Fix backwards compatibility for 
ParquetV2 data pages written prior to 3.0.0 per ARROW-10353 #13665
+            // // https://github.com/apache/arrow/pull/13665/files
+            // // Prior to parquet-cpp version 2.0.0, is_compressed was always 
set to false in column headers,
+            // // even if compression was used. See ARROW-17100.
+            // bool always_compressed = _is_always_compressed(_ctx);
+            // payload_is_compressed |= always_compressed;
+
+            // // Apply codec check: if codec is UNCOMPRESSED, payload cannot 
be compressed
+            // payload_is_compressed = payload_is_compressed &&
+            //                         (_metadata.codec != 
tparquet::CompressionCodec::UNCOMPRESSED);
+
+            // // Save the computed result for use by ColumnChunkReader
+            // _payload_is_compressed = payload_is_compressed;
+
+            bool is_cache_payload_decompressed = true;
+            if (_cur_page_header.compressed_page_size > 0) {
+                is_cache_payload_decompressed =
+                        (_metadata.codec == 
tparquet::CompressionCodec::UNCOMPRESSED) ||
+                        
(static_cast<double>(_cur_page_header.uncompressed_page_size) <=
+                         
static_cast<double>(config::parquet_page_cache_decompress_threshold) *
+                                 
static_cast<double>(_cur_page_header.compressed_page_size));
+            }
+
+            if (is_cache_payload_decompressed) {
+                _page_statistics.page_cache_decompressed_hit_counter += 1;
+            } else {
+                _page_statistics.page_cache_compressed_hit_counter += 1;
+            }
+
+            _is_cache_payload_decompressed = is_cache_payload_decompressed;
+
+            if constexpr (OFFSET_INDEX == false) {
+                if (is_header_v2()) {
+                    _end_row = _start_row + 
_cur_page_header.data_page_header_v2.num_rows;
+                } else if constexpr (!IN_COLLECTION) {
+                    _end_row = _start_row + 
_cur_page_header.data_page_header.num_values;
+                }
+            }
+
+            // Save header bytes for later use (e.g., to insert updated cache 
entries)
+            _header_buf.assign(s.data, s.data + real_header_size);
+            _last_header_size = real_header_size;
+            _page_statistics.parse_page_header_num++;
+            _offset += real_header_size;
+            _next_header_offset = _offset + 
_cur_page_header.compressed_page_size;
+            _state = HEADER_PARSED;
+            return Status::OK();
+        } else {
+            _page_statistics.page_cache_missing_counter += 1;
+            // Clear any existing cache handle on miss to avoid holding stale 
handle
+            _page_cache_handle = PageCacheHandle();
+        }
+    }
+    // NOTE: page cache lookup for *decompressed* page data is handled in
+    // ColumnChunkReader::load_page_data(). PageReader should only be
+    // responsible for parsing the header bytes from the file and saving
+    // them in `_header_buf` for possible later insertion into the cache.
     while (true) {
         if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
             return Status::EndOfFile("stop");
@@ -115,6 +249,9 @@ Status PageReader<IN_COLLECTION, 
OFFSET_INDEX>::parse_page_header() {
         }
     }
 
+    // Save header bytes for possible cache insertion later
+    _header_buf.assign(page_header_buf, page_header_buf + real_header_size);
+    _last_header_size = real_header_size;
     _page_statistics.parse_page_header_num++;
     _offset += real_header_size;
     _next_header_offset = _offset + _cur_page_header.compressed_page_size;
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_page_reader.h
index 9246819d59c..a722507c742 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.h
@@ -23,7 +23,9 @@
 #include <memory>
 
 #include "common/cast_set.h"
+#include "common/config.h"
 #include "common/status.h"
+#include "olap/page_cache.h"
 #include "util/block_compression.h"
 #include "vec/exec/format/parquet/parquet_common.h"
 namespace doris {
@@ -50,6 +52,15 @@ namespace doris::vectorized {
  * Use to deserialize parquet page header, and get the page data in iterator 
interface.
  */
 
+// Session-level options for parquet page reading/caching.
+struct ParquetPageReadContext {
+    bool enable_parquet_file_page_cache = true;
+    std::string created_by; // Parquet file's created_by field for version 
checking
+    ParquetPageReadContext() = default;
+    ParquetPageReadContext(bool enable, const std::string& created_by_str = "")
+            : enable_parquet_file_page_cache(enable), 
created_by(created_by_str) {}
+};
+
 template <bool IN_COLLECTION, bool OFFSET_INDEX>
 class PageReader {
 public:
@@ -58,11 +69,29 @@ public:
         int64_t skip_page_header_num = 0;
         int64_t parse_page_header_num = 0;
         int64_t read_page_header_time = 0;
+        // page cache metrics
+        // page_cache_hit_counter: number of times a cached page was used
+        int64_t page_cache_hit_counter = 0;
+        // page_cache_missing_counter: number of times a cached page was not 
found
+        int64_t page_cache_missing_counter = 0;
+        // page_cache_compressed_hit_counter: number of cache hits where the 
cached payload is compressed
+        int64_t page_cache_compressed_hit_counter = 0;
+        // page_cache_decompressed_hit_counter: number of cache hits where the 
cached payload is decompressed
+        int64_t page_cache_decompressed_hit_counter = 0;
+        // page_cache_write_counter: number of times this reader wrote an 
entry into the cache
+        int64_t page_cache_write_counter = 0;
+        // page_cache_compressed_write_counter: number of times this reader 
wrote compressed page into the cache
+        int64_t page_cache_compressed_write_counter = 0;
+        // page_cache_decompressed_write_counter: number of times this reader 
wrote decompressed page into the cache
+        int64_t page_cache_decompressed_write_counter = 0;
+        // page_read_counter: total pages read by this PageReader (includes 
cache hits and file reads)
+        int64_t page_read_counter = 0;
     };
 
     PageReader(io::BufferedStreamReader* reader, io::IOContext* io_ctx, 
uint64_t offset,
-               uint64_t length, size_t total_rows,
-               const tparquet::OffsetIndex* offset_index = nullptr);
+               uint64_t length, size_t total_rows, const 
tparquet::ColumnMetaData& metadata,
+               const tparquet::OffsetIndex* offset_index = nullptr,
+               const ParquetPageReadContext& ctx = ParquetPageReadContext());
     ~PageReader() = default;
 
     bool has_next_page() const {
@@ -123,24 +152,61 @@ public:
         }
     }
 
-    Status get_page_header(const tparquet::PageHeader*& page_header) {
+    Status get_page_header(const tparquet::PageHeader** page_header) {
         if (UNLIKELY(_state != HEADER_PARSED)) {
             return Status::InternalError("Page header not parsed");
         }
-        page_header = &_cur_page_header;
+        *page_header = &_cur_page_header;
         return Status::OK();
     }
 
     Status get_page_data(Slice& slice);
 
+    // Skip page data and update offset (used when data is loaded from cache)
+    void skip_page_data() {
+        if (_state == HEADER_PARSED) {
+            _offset += _cur_page_header.compressed_page_size;
+            _state = DATA_LOADED;
+        }
+    }
+
+    // Expose header bytes info for cache insertion
+    uint32_t last_header_size() const { return _last_header_size; }
+    const std::vector<uint8_t>& header_bytes() const { return _header_buf; }
+    // header start offset for current page
+    int64_t header_start_offset() const {
+        return static_cast<int64_t>(_next_header_offset) - 
static_cast<int64_t>(_last_header_size) -
+               static_cast<int64_t>(_cur_page_header.compressed_page_size);
+    }
+    uint64_t file_end_offset() const { return _end_offset; }
+    bool cached_decompressed() const {
+        return static_cast<double>(_cur_page_header.uncompressed_page_size) <=
+               
static_cast<double>(config::parquet_page_cache_decompress_threshold) *
+                       
static_cast<double>(_cur_page_header.compressed_page_size);
+    }
+
     PageStatistics& page_statistics() { return _page_statistics; }
 
     bool is_header_v2() { return _cur_page_header.__isset.data_page_header_v2; 
}
 
+    // Returns whether the current page's cache payload is decompressed
+    bool is_cache_payload_decompressed() const { return 
_is_cache_payload_decompressed; }
+
     size_t start_row() const { return _start_row; }
 
     size_t end_row() const { return _end_row; }
 
+    // Accessors for cache handle
+    bool has_page_cache_handle() const { return _page_cache_handle.cache() != 
nullptr; }
+    const doris::PageCacheHandle& page_cache_handle() const { return 
_page_cache_handle; }
+
+    // Page cache members
+    doris::PageCacheHandle _page_cache_handle;
+    // stored header bytes when cache miss so we can insert header+payload 
into cache
+    std::vector<uint8_t> _header_buf;
+    // last parsed header size in bytes
+    uint32_t _last_header_size = 0;
+
 private:
     enum PageReaderState { INITIALIZED, HEADER_PARSED, DATA_LOADED };
     PageReaderState _state = INITIALIZED;
@@ -159,19 +225,27 @@ private:
     size_t _end_row = 0;
     // total rows in this column chunk
     size_t _total_rows = 0;
+    // Column metadata for this column chunk
+    const tparquet::ColumnMetaData& _metadata;
     // for page index
     size_t _page_index = 0;
     const tparquet::OffsetIndex* _offset_index;
 
+    // Session-level parquet page cache options
+    ParquetPageReadContext _ctx;
+
     tparquet::PageHeader _cur_page_header;
+    bool _is_cache_payload_decompressed = true;
 };
 
 template <bool IN_COLLECTION, bool OFFSET_INDEX>
 std::unique_ptr<PageReader<IN_COLLECTION, OFFSET_INDEX>> create_page_reader(
         io::BufferedStreamReader* reader, io::IOContext* io_ctx, uint64_t 
offset, uint64_t length,
-        size_t total_rows, const tparquet::OffsetIndex* offset_index = 
nullptr) {
-    return std::make_unique<PageReader<IN_COLLECTION, OFFSET_INDEX>>(reader, 
io_ctx, offset, length,
-                                                                     
total_rows, offset_index);
+        size_t total_rows, const tparquet::ColumnMetaData& metadata,
+        const tparquet::OffsetIndex* offset_index = nullptr,
+        const ParquetPageReadContext& ctx = ParquetPageReadContext()) {
+    return std::make_unique<PageReader<IN_COLLECTION, OFFSET_INDEX>>(
+            reader, io_ctx, offset, length, total_rows, metadata, 
offset_index, ctx);
 }
 #include "common/compile_check_end.h"
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 76922608c2a..62dde51f009 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -183,6 +183,22 @@ void ParquetReader::_init_profile() {
                 ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecompressTime", 
parquet_profile, 1);
         _parquet_profile.decompress_cnt = ADD_CHILD_COUNTER_WITH_LEVEL(
                 _profile, "DecompressCount", TUnit::UNIT, parquet_profile, 1);
+        _parquet_profile.page_read_counter = ADD_CHILD_COUNTER_WITH_LEVEL(
+                _profile, "PageReadCount", TUnit::UNIT, parquet_profile, 1);
+        _parquet_profile.page_cache_write_counter = 
ADD_CHILD_COUNTER_WITH_LEVEL(
+                _profile, "PageCacheWriteCount", TUnit::UNIT, parquet_profile, 
1);
+        _parquet_profile.page_cache_compressed_write_counter = 
ADD_CHILD_COUNTER_WITH_LEVEL(
+                _profile, "PageCacheCompressedWriteCount", TUnit::UNIT, 
parquet_profile, 1);
+        _parquet_profile.page_cache_decompressed_write_counter = 
ADD_CHILD_COUNTER_WITH_LEVEL(
+                _profile, "PageCacheDecompressedWriteCount", TUnit::UNIT, 
parquet_profile, 1);
+        _parquet_profile.page_cache_hit_counter = ADD_CHILD_COUNTER_WITH_LEVEL(
+                _profile, "PageCacheHitCount", TUnit::UNIT, parquet_profile, 
1);
+        _parquet_profile.page_cache_missing_counter = 
ADD_CHILD_COUNTER_WITH_LEVEL(
+                _profile, "PageCacheMissingCount", TUnit::UNIT, 
parquet_profile, 1);
+        _parquet_profile.page_cache_compressed_hit_counter = 
ADD_CHILD_COUNTER_WITH_LEVEL(
+                _profile, "PageCacheCompressedHitCount", TUnit::UNIT, 
parquet_profile, 1);
+        _parquet_profile.page_cache_decompressed_hit_counter = 
ADD_CHILD_COUNTER_WITH_LEVEL(
+                _profile, "PageCacheDecompressedHitCount", TUnit::UNIT, 
parquet_profile, 1);
         _parquet_profile.decode_header_time =
                 ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PageHeaderDecodeTime", 
parquet_profile, 1);
         _parquet_profile.read_page_header_time =
@@ -789,12 +805,13 @@ Status ParquetReader::_next_row_group_reader() {
                                   _profile, _file_reader, io_ranges, 
merged_read_slice_size)
                         : _file_reader;
     }
-    _current_group_reader.reset(new RowGroupReader(
-            _io_ctx ? 
std::make_shared<io::TracingFileReader>(group_file_reader,
-                                                              
_io_ctx->file_reader_stats)
-                    : group_file_reader,
-            _read_table_columns, _current_row_group_index.row_group_id, 
row_group, _ctz, _io_ctx,
-            position_delete_ctx, _lazy_read_ctx, _state, _column_ids, 
_filter_column_ids));
+    _current_group_reader.reset(
+            new RowGroupReader(_io_ctx ? 
std::make_shared<io::TracingFileReader>(
+                                                 group_file_reader, 
_io_ctx->file_reader_stats)
+                                       : group_file_reader,
+                               _read_table_columns, 
_current_row_group_index.row_group_id,
+                               row_group, _ctz, _io_ctx, position_delete_ctx, 
_lazy_read_ctx,
+                               _state, _column_ids, _filter_column_ids, 
_t_metadata->created_by));
     _row_group_eof = false;
 
     _current_group_reader->set_current_row_group_idx(_current_row_group_index);
@@ -1209,6 +1226,21 @@ void ParquetReader::_collect_profile() {
                    _column_statistics.page_index_read_calls);
     COUNTER_UPDATE(_parquet_profile.decompress_time, 
_column_statistics.decompress_time);
     COUNTER_UPDATE(_parquet_profile.decompress_cnt, 
_column_statistics.decompress_cnt);
+    COUNTER_UPDATE(_parquet_profile.page_read_counter, 
_column_statistics.page_read_counter);
+    COUNTER_UPDATE(_parquet_profile.page_cache_write_counter,
+                   _column_statistics.page_cache_write_counter);
+    COUNTER_UPDATE(_parquet_profile.page_cache_compressed_write_counter,
+                   _column_statistics.page_cache_compressed_write_counter);
+    COUNTER_UPDATE(_parquet_profile.page_cache_decompressed_write_counter,
+                   _column_statistics.page_cache_decompressed_write_counter);
+    COUNTER_UPDATE(_parquet_profile.page_cache_hit_counter,
+                   _column_statistics.page_cache_hit_counter);
+    COUNTER_UPDATE(_parquet_profile.page_cache_missing_counter,
+                   _column_statistics.page_cache_missing_counter);
+    COUNTER_UPDATE(_parquet_profile.page_cache_compressed_hit_counter,
+                   _column_statistics.page_cache_compressed_hit_counter);
+    COUNTER_UPDATE(_parquet_profile.page_cache_decompressed_hit_counter,
+                   _column_statistics.page_cache_decompressed_hit_counter);
     COUNTER_UPDATE(_parquet_profile.decode_header_time, 
_column_statistics.decode_header_time);
     COUNTER_UPDATE(_parquet_profile.read_page_header_time,
                    _column_statistics.read_page_header_time);
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 6652fe73092..a7a90846321 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -190,6 +190,14 @@ private:
         RuntimeProfile::Counter* file_footer_hit_cache = nullptr;
         RuntimeProfile::Counter* decompress_time = nullptr;
         RuntimeProfile::Counter* decompress_cnt = nullptr;
+        RuntimeProfile::Counter* page_read_counter = nullptr;
+        RuntimeProfile::Counter* page_cache_write_counter = nullptr;
+        RuntimeProfile::Counter* page_cache_compressed_write_counter = nullptr;
+        RuntimeProfile::Counter* page_cache_decompressed_write_counter = 
nullptr;
+        RuntimeProfile::Counter* page_cache_hit_counter = nullptr;
+        RuntimeProfile::Counter* page_cache_missing_counter = nullptr;
+        RuntimeProfile::Counter* page_cache_compressed_hit_counter = nullptr;
+        RuntimeProfile::Counter* page_cache_decompressed_hit_counter = nullptr;
         RuntimeProfile::Counter* decode_header_time = nullptr;
         RuntimeProfile::Counter* read_page_header_time = nullptr;
         RuntimeProfile::Counter* decode_value_time = nullptr;
diff --git a/be/test/io/fs/buffered_reader_test.cpp 
b/be/test/io/fs/buffered_reader_test.cpp
index 3874b06c68c..bc92d22b178 100644
--- a/be/test/io/fs/buffered_reader_test.cpp
+++ b/be/test/io/fs/buffered_reader_test.cpp
@@ -68,6 +68,8 @@ public:
 
     bool closed() const override { return _reader->closed(); }
 
+    int64_t mtime() const override { return _reader->mtime(); }
+
 private:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const io::IOContext* io_ctx) override {
@@ -96,6 +98,8 @@ public:
 
     bool closed() const override { return _closed; }
 
+    int64_t mtime() const override { return 0; }
+
 protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const io::IOContext* io_ctx) override {
@@ -130,6 +134,8 @@ public:
 
     bool closed() const override { return _delegate->closed(); }
 
+    int64_t mtime() const override { return _delegate->mtime(); }
+
     const io::PrefetchRange& last_read_range() const { return 
*_last_read_range; }
 
 protected:
diff --git a/be/test/vec/exec/format/file_reader/file_meta_cache_test.cpp 
b/be/test/vec/exec/format/file_reader/file_meta_cache_test.cpp
index 3aef8db8459..5ea6335d2b9 100644
--- a/be/test/vec/exec/format/file_reader/file_meta_cache_test.cpp
+++ b/be/test/vec/exec/format/file_reader/file_meta_cache_test.cpp
@@ -38,6 +38,8 @@ public:
 
     bool closed() const override { return _closed; }
 
+    int64_t mtime() const override { return 0; }
+
     Status close() override {
         _closed = true;
         return Status::OK();
diff --git a/be/test/vec/exec/format/parquet/parquet_page_cache_test.cpp 
b/be/test/vec/exec/format/parquet/parquet_page_cache_test.cpp
new file mode 100644
index 00000000000..21f2fa4e82b
--- /dev/null
+++ b/be/test/vec/exec/format/parquet/parquet_page_cache_test.cpp
@@ -0,0 +1,859 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <fmt/format.h>
+#include <gtest/gtest.h>
+
+#include "common/config.h"
+#include "io/fs/buffered_reader.h"
+#include "olap/page_cache.h"
+#include "runtime/exec_env.h"
+#include "runtime/memory/cache_manager.h"
+#include "util/block_compression.h"
+#include "util/faststring.h"
+#include "util/thrift_util.h"
+#include "vec/exec/format/parquet/schema_desc.h"
+#include "vec/exec/format/parquet/vparquet_column_chunk_reader.h"
+#include "vec/exec/format/parquet/vparquet_page_reader.h"
+
+using namespace doris;
+using namespace doris::vectorized;
+
+class FakeBufferedReader : public io::BufferedStreamReader {
+public:
+    FakeBufferedReader(std::string path, std::vector<uint8_t> data)
+            : _path(std::move(path)), _data(std::move(data)) {}
+    Status read_bytes(const uint8_t** buf, uint64_t offset, const size_t 
bytes_to_read,
+                      const doris::io::IOContext* io_ctx) override {
+        if (offset + bytes_to_read > _data.size()) return Status::IOError("Out 
of bounds");
+        *buf = _data.data() + offset;
+        return Status::OK();
+    }
+    Status read_bytes(Slice& slice, uint64_t offset, const 
doris::io::IOContext* io_ctx) override {
+        if (offset + slice.size > _data.size()) return Status::IOError("Out of 
bounds");
+        slice.data = reinterpret_cast<char*>(_data.data() + offset);
+        return Status::OK();
+    }
+    std::string path() override { return _path; }
+
+    int64_t mtime() const override { return 0; }
+
+private:
+    std::string _path;
+    std::vector<uint8_t> _data;
+};
+
+TEST(ParquetPageCacheTest, CacheHitReturnsDecompressedPayload) {
+    // setup storage page cache
+    // 
ExecEnv::GetInstance()->set_storage_page_cache(StoragePageCache::create_global_cache(1
 << 20, 10, 0));
+
+    ParquetPageReadContext ctx;
+    ctx.enable_parquet_file_page_cache = true;
+
+    // construct thrift PageHeader (uncompressed payload) and payload
+    tparquet::PageHeader header;
+    header.type = tparquet::PageType::DATA_PAGE;
+    header.__set_compressed_page_size(4);
+    header.__set_uncompressed_page_size(4);
+    header.__isset.data_page_header = true;
+    header.data_page_header.__set_num_values(1);
+
+    std::vector<uint8_t> header_bytes;
+    ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+    ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok());
+
+    std::vector<uint8_t> payload = {0x11, 0x22, 0x33, 0x44};
+    std::vector<uint8_t> cached_data;
+    cached_data.insert(cached_data.end(), header_bytes.begin(), 
header_bytes.end());
+    cached_data.insert(cached_data.end(), payload.begin(), payload.end());
+
+    std::string path = "test_parquet_cache_file";
+    int64_t header_offset = 128;
+    // make file_end_offset consistent with reader/page reader end offset used 
in test
+    int64_t file_end_offset = header_offset + 
static_cast<int64_t>(cached_data.size());
+
+    // insert into cache
+    int64_t mtime = 0;
+    StoragePageCache::CacheKey key(fmt::format("{}::{}", path, mtime),
+                                   static_cast<size_t>(file_end_offset), 
header_offset);
+    size_t total = cached_data.size();
+    auto* page = new DataPage(total, true, segment_v2::DATA_PAGE);
+    memcpy(page->data(), cached_data.data(), total);
+    page->reset_size(total);
+    PageCacheHandle handle;
+    StoragePageCache::instance()->insert(key, page, &handle, 
segment_v2::DATA_PAGE);
+
+    // create fake reader and a ColumnChunkReader to verify cache hit
+    // ensure the reader contains the same header+payload at the header offset 
so header parsing succeeds
+    std::vector<uint8_t> backing(256, 0);
+    memcpy(backing.data() + header_offset, cached_data.data(), total);
+    FakeBufferedReader reader(path, backing);
+    // prepare column chunk metadata so ColumnChunkReader uses same offsets
+    tparquet::ColumnChunk cc;
+    cc.meta_data.__set_data_page_offset(header_offset);
+    cc.meta_data.__set_total_compressed_size(total);
+    cc.meta_data.__set_num_values(1);
+    cc.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED);
+
+    FieldSchema field_schema;
+    field_schema.repetition_level = 0;
+    field_schema.definition_level = 0;
+
+    ColumnChunkReader<false, false> ccr(&reader, &cc, &field_schema, nullptr, 
0, nullptr, ctx);
+    ASSERT_TRUE(ccr.init().ok());
+    // ASSERT_TRUE(ccr.next_page().ok());
+    // load_page_data should hit the cache and return decompressed payload
+    ASSERT_TRUE(ccr.load_page_data().ok());
+    Slice s = ccr.get_page_data();
+    ASSERT_EQ(s.size, payload.size());
+    ASSERT_EQ(0, memcmp(s.data, payload.data(), payload.size()));
+    // stats: ensure there was a page read and at least one hit recorded
+    auto& statistics = ccr.statistics();
+    EXPECT_EQ(statistics.page_read_counter, 1);
+    EXPECT_EQ(statistics.page_cache_hit_counter, 1);
+    EXPECT_EQ(statistics.page_cache_decompressed_hit_counter, 1);
+    // now safe to cleanup cache
+    // {
+    //     StoragePageCache* _cache_ptr = StoragePageCache::instance();
+    //     // Ensure any outstanding PageCacheHandle is released before 
destroying cache
+    //     handle = PageCacheHandle();
+    //     ExecEnv::GetInstance()->set_storage_page_cache(nullptr);
+    //     delete _cache_ptr;
+    //     // Clear CacheManager registration so tests do not leave global 
state
+    //     // behind when unregister_cache is a no-op under BE_TEST.
+    //     CacheManager::instance()->clear_for_test();
+    // }
+}
+
+TEST(ParquetPageCacheTest, DecompressedPageInsertedByColumnChunkReader) {
+    // 
ExecEnv::GetInstance()->set_storage_page_cache(StoragePageCache::create_global_cache(1
 << 20, 10, 0));
+
+    ParquetPageReadContext ctx;
+    ctx.enable_parquet_file_page_cache = true;
+    // ensure decompressed pages are cached via BE config
+    double old_thresh = config::parquet_page_cache_decompress_threshold;
+    bool old_enable_compressed = config::enable_parquet_cache_compressed_pages;
+    config::parquet_page_cache_decompress_threshold = 100.0;
+    config::enable_parquet_cache_compressed_pages = false;
+
+    // construct uncompressed header + payload in file buffer
+    tparquet::PageHeader header;
+    header.type = tparquet::PageType::DATA_PAGE;
+    header.__set_compressed_page_size(4);
+    header.__set_uncompressed_page_size(4);
+    header.__isset.data_page_header = true;
+    header.data_page_header.__set_num_values(1);
+
+    std::vector<uint8_t> header_bytes;
+    ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+    ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok());
+
+    std::vector<uint8_t> payload = {0x55, 0x66, 0x77, 0x88};
+    std::vector<uint8_t> file_data;
+    file_data.insert(file_data.end(), header_bytes.begin(), 
header_bytes.end());
+    file_data.insert(file_data.end(), payload.begin(), payload.end());
+
+    std::string path = "test_parquet_insert_file";
+    int64_t header_offset = 0;
+
+    FakeBufferedReader reader(path, file_data);
+
+    // prepare column chunk metadata
+    tparquet::ColumnChunk cc;
+    cc.meta_data.__set_data_page_offset(header_offset);
+    cc.meta_data.__set_total_compressed_size(file_data.size());
+    cc.meta_data.__set_num_values(1);
+    cc.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED);
+
+    {
+        FieldSchema field_schema;
+        field_schema.repetition_level = 0;
+        field_schema.definition_level = 0;
+        ColumnChunkReader<false, false> ccr(&reader, &cc, &field_schema, 
nullptr, 0, nullptr, ctx);
+        ASSERT_TRUE(ccr.init().ok());
+        // ASSERT_TRUE(ccr.next_page().ok());
+        ASSERT_TRUE(ccr.load_page_data().ok());
+
+        // Now cache should have an entry; verify by creating a fresh 
ColumnChunkReader and hitting cache
+        ColumnChunkReader<false, false> ccr_check(&reader, &cc, &field_schema, 
nullptr, 0, nullptr,
+                                                  ctx);
+        ASSERT_TRUE(ccr_check.init().ok());
+        // ASSERT_TRUE(ccr_check.next_page().ok());
+        ASSERT_TRUE(ccr_check.load_page_data().ok());
+        Slice s = ccr_check.get_page_data();
+        ASSERT_EQ(s.size, payload.size());
+        EXPECT_EQ(0, memcmp(s.data, payload.data(), payload.size()));
+        EXPECT_EQ(ccr_check.statistics().page_cache_hit_counter, 1);
+    }
+    // cleanup cache after readers go out of scope
+    // {
+    //     StoragePageCache* _cache_ptr = StoragePageCache::instance();
+    //     ExecEnv::GetInstance()->set_storage_page_cache(nullptr);
+    //     delete _cache_ptr;
+    //     CacheManager::instance()->clear_for_test();
+    // }
+    // restore config
+    config::parquet_page_cache_decompress_threshold = old_thresh;
+    config::enable_parquet_cache_compressed_pages = old_enable_compressed;
+}
+
+TEST(ParquetPageCacheTest, V2LevelsPreservedInCache) {
+    // 
ExecEnv::GetInstance()->set_storage_page_cache(StoragePageCache::create_global_cache(1
 << 20, 10, 0));
+
+    ParquetPageReadContext ctx;
+    ctx.enable_parquet_file_page_cache = true;
+    // ensure decompressed pages are cached via BE config
+    double old_thresh = config::parquet_page_cache_decompress_threshold;
+    bool old_enable_compressed = config::enable_parquet_cache_compressed_pages;
+    config::parquet_page_cache_decompress_threshold = 100.0;
+    config::enable_parquet_cache_compressed_pages = false;
+
+    // construct v2 header + levels + payload in file buffer (uncompressed)
+    tparquet::PageHeader header;
+    header.type = tparquet::PageType::DATA_PAGE_V2;
+    int rl = 2;
+    int dl = 1;
+    int payload_sz = 2;
+    header.__set_compressed_page_size(rl + dl + payload_sz);
+    header.__set_uncompressed_page_size(rl + dl + payload_sz);
+    header.__isset.data_page_header_v2 = true;
+    header.data_page_header_v2.__set_repetition_levels_byte_length(rl);
+    header.data_page_header_v2.__set_definition_levels_byte_length(dl);
+    header.data_page_header_v2.__set_is_compressed(false);
+    header.data_page_header_v2.__set_num_values(1);
+
+    std::vector<uint8_t> header_bytes;
+    ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+    ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok());
+
+    std::vector<uint8_t> level_bytes = {0x11, 0x22, 0x33};
+    std::vector<uint8_t> payload = {0xAA, 0xBB};
+    std::vector<uint8_t> file_data;
+    file_data.insert(file_data.end(), header_bytes.begin(), 
header_bytes.end());
+    file_data.insert(file_data.end(), level_bytes.begin(), level_bytes.end());
+    file_data.insert(file_data.end(), payload.begin(), payload.end());
+
+    std::string path = "test_v2_levels_file";
+    FakeBufferedReader reader(path, file_data);
+
+    // prepare column chunk metadata
+    tparquet::ColumnChunk cc;
+    cc.meta_data.__set_data_page_offset(0);
+    cc.meta_data.__set_total_compressed_size(file_data.size());
+    cc.meta_data.__set_num_values(1);
+    cc.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED);
+
+    FieldSchema field_schema;
+    field_schema.repetition_level = 0;
+    field_schema.definition_level = 0;
+    {
+        ColumnChunkReader<false, false> ccr(&reader, &cc, &field_schema, 
nullptr, 0, nullptr, ctx);
+        ASSERT_TRUE(ccr.init().ok());
+        // ASSERT_TRUE(ccr.next_page().ok());
+        ASSERT_TRUE(ccr.load_page_data().ok());
+
+        // Now cache should have entry; verify by creating a ColumnChunkReader 
and hitting cache
+        ColumnChunkReader<false, false> ccr_check(&reader, &cc, &field_schema, 
nullptr, 0, nullptr,
+                                                  ctx);
+        ASSERT_TRUE(ccr_check.init().ok());
+        // ASSERT_TRUE(ccr_check.next_page().ok());
+        ASSERT_TRUE(ccr_check.load_page_data().ok());
+        Slice s = ccr_check.get_page_data();
+        ASSERT_EQ(s.size, payload.size());
+        EXPECT_EQ(0, memcmp(s.data, payload.data(), payload.size()));
+    }
+
+    // Verify that a fresh ColumnChunkReader reusing cache gets level bytes 
preserved
+    FieldSchema field_schema2;
+    field_schema2.repetition_level = 2; // v2 levels present
+    field_schema2.definition_level = 1;
+    ColumnChunkReader<false, false> ccr2(&reader, &cc, &field_schema2, 
nullptr, 0, nullptr);
+    ASSERT_TRUE(ccr2.init().ok());
+    // ASSERT_TRUE(ccr2.next_page().ok());
+    ASSERT_TRUE(ccr2.load_page_data().ok());
+    // Level slices should equal the original level bytes
+    const Slice& rep = ccr2.v2_rep_levels();
+    const Slice& def = ccr2.v2_def_levels();
+    auto& statistics = ccr2.statistics();
+    EXPECT_GT(statistics.page_cache_hit_counter, 0);
+    // because threshold is set to cache decompressed, we should see 
decompressed hits
+    EXPECT_GT(statistics.page_cache_decompressed_hit_counter, 0);
+    ASSERT_EQ(def.size, dl);
+    EXPECT_EQ(0, memcmp(rep.data, level_bytes.data(), rl));
+    EXPECT_EQ(0, memcmp(def.data, level_bytes.data() + rl, dl));
+    // // cleanup cache after readers have been destroyed
+    // {
+    //     StoragePageCache* _cache_ptr = StoragePageCache::instance();
+    //     ExecEnv::GetInstance()->set_storage_page_cache(nullptr);
+    //     delete _cache_ptr;
+    //     CacheManager::instance()->clear_for_test();
+    // }
+    // restore config
+    config::parquet_page_cache_decompress_threshold = old_thresh;
+    config::enable_parquet_cache_compressed_pages = old_enable_compressed;
+}
+
+TEST(ParquetPageCacheTest, CompressedV1PageCachedAndHit) {
+    ParquetPageReadContext ctx;
+    ctx.enable_parquet_file_page_cache = true;
+    // Note: parquet_page_cache_decompress_threshold and 
enable_parquet_cache_compressed_pages
+    // are now BE config variables, not context fields
+
+    // construct compressed v1 header + compressed payload in file buffer
+    tparquet::PageHeader header;
+    header.type = tparquet::PageType::DATA_PAGE;
+    header.__isset.data_page_header = true;
+    header.data_page_header.__set_num_values(1);
+
+    std::vector<uint8_t> payload = {0x01, 0x02, 0x03, 0x04};
+
+    // compress payload using a block codec
+    BlockCompressionCodec* codec = nullptr;
+    
ASSERT_TRUE(get_block_compression_codec(segment_v2::CompressionTypePB::SNAPPY, 
&codec).ok());
+    faststring compressed_fast;
+    std::vector<Slice> inputs;
+    inputs.emplace_back(payload.data(), payload.size());
+    ASSERT_TRUE(codec->compress(inputs, payload.size(), 
&compressed_fast).ok());
+
+    
header.__set_compressed_page_size(static_cast<int32_t>(compressed_fast.size()));
+    header.__set_uncompressed_page_size(static_cast<int32_t>(payload.size()));
+
+    std::vector<uint8_t> header_bytes;
+    ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+    ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok());
+
+    std::vector<uint8_t> file_data;
+    file_data.insert(file_data.end(), header_bytes.begin(), 
header_bytes.end());
+    file_data.insert(file_data.end(), compressed_fast.data(),
+                     compressed_fast.data() + compressed_fast.size());
+
+    std::string path = "test_compressed_v1_file";
+    FakeBufferedReader reader(path, file_data);
+
+    tparquet::ColumnChunk cc;
+    cc.meta_data.__set_data_page_offset(0);
+    cc.meta_data.__set_total_compressed_size(file_data.size());
+    cc.meta_data.__set_num_values(1);
+    cc.meta_data.__set_codec(tparquet::CompressionCodec::SNAPPY);
+
+    FieldSchema field_schema;
+    field_schema.repetition_level = 0;
+    field_schema.definition_level = 0;
+
+    // Load page to trigger decompression + cache insert
+    ColumnChunkReader<false, false> ccr(&reader, &cc, &field_schema, nullptr, 
0, nullptr, ctx);
+    ASSERT_TRUE(ccr.init().ok());
+    // ASSERT_TRUE(ccr.next_page().ok());
+    ASSERT_TRUE(ccr.load_page_data().ok());
+    EXPECT_EQ(ccr.statistics().page_cache_write_counter, 1);
+
+    // Now verify a fresh reader hits the cache and returns payload
+    ColumnChunkReader<false, false> ccr_check(&reader, &cc, &field_schema, 
nullptr, 0, nullptr,
+                                              ctx);
+    ASSERT_TRUE(ccr_check.init().ok());
+    // ASSERT_TRUE(ccr_check.next_page().ok());
+    ASSERT_TRUE(ccr_check.load_page_data().ok());
+    Slice s = ccr_check.get_page_data();
+    ASSERT_EQ(s.size, payload.size());
+    EXPECT_EQ(0, memcmp(s.data, payload.data(), payload.size()));
+    EXPECT_EQ(ccr_check.statistics().page_cache_hit_counter, 1);
+}
+
+TEST(ParquetPageCacheTest, CompressedV2LevelsPreservedInCache) {
+    ParquetPageReadContext ctx;
+    ctx.enable_parquet_file_page_cache = true;
+    // Note: parquet_page_cache_decompress_threshold and 
enable_parquet_cache_compressed_pages
+    // are now BE config variables, not context fields
+
+    // construct v2 header + levels + compressed payload in file buffer
+    tparquet::PageHeader header;
+    header.type = tparquet::PageType::DATA_PAGE_V2;
+    int rl = 2;
+    int dl = 1;
+    //int payload_sz = 2;
+    header.__isset.data_page_header_v2 = true;
+    header.data_page_header_v2.__set_repetition_levels_byte_length(rl);
+    header.data_page_header_v2.__set_definition_levels_byte_length(dl);
+    header.data_page_header_v2.__set_is_compressed(true);
+    header.data_page_header_v2.__set_num_values(1);
+
+    std::vector<uint8_t> level_bytes = {0x11, 0x22, 0x33};
+    std::vector<uint8_t> payload = {0xAA, 0xBB};
+
+    // compress payload
+    BlockCompressionCodec* codec = nullptr;
+    
ASSERT_TRUE(get_block_compression_codec(segment_v2::CompressionTypePB::SNAPPY, 
&codec).ok());
+    faststring compressed_fast;
+    std::vector<Slice> inputs;
+    inputs.emplace_back(payload.data(), payload.size());
+    ASSERT_TRUE(codec->compress(inputs, payload.size(), 
&compressed_fast).ok());
+
+    // compressed page: levels (uncompressed) followed by compressed payload
+    std::vector<uint8_t> compressed_page;
+    compressed_page.insert(compressed_page.end(), level_bytes.begin(), 
level_bytes.end());
+    compressed_page.insert(compressed_page.end(), compressed_fast.data(),
+                           compressed_fast.data() + compressed_fast.size());
+
+    
header.__set_compressed_page_size(static_cast<int32_t>(compressed_page.size()));
+    
header.__set_uncompressed_page_size(static_cast<int32_t>(level_bytes.size() + 
payload.size()));
+
+    std::vector<uint8_t> header_bytes;
+    ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+    ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok());
+
+    std::vector<uint8_t> file_data;
+    file_data.insert(file_data.end(), header_bytes.begin(), 
header_bytes.end());
+    file_data.insert(file_data.end(), compressed_page.begin(), 
compressed_page.end());
+
+    std::string path = "test_compressed_v2_file";
+    FakeBufferedReader reader(path, file_data);
+
+    tparquet::ColumnChunk cc;
+    cc.meta_data.__set_data_page_offset(0);
+    cc.meta_data.__set_total_compressed_size(file_data.size());
+    cc.meta_data.__set_num_values(1);
+    cc.meta_data.__set_codec(tparquet::CompressionCodec::SNAPPY);
+
+    FieldSchema field_schema;
+    field_schema.repetition_level = 0;
+    field_schema.definition_level = 0;
+
+    // Load page to trigger decompression + cache insert
+    ColumnChunkReader<false, false> ccr(&reader, &cc, &field_schema, nullptr, 
0, nullptr, ctx);
+    ASSERT_TRUE(ccr.init().ok());
+    // ASSERT_TRUE(ccr.next_page().ok());
+    ASSERT_TRUE(ccr.load_page_data().ok());
+    EXPECT_EQ(ccr.statistics().page_cache_write_counter, 1);
+
+    // Now verify a fresh reader hits the cache and v2 levels are preserved
+    FieldSchema field_schema2;
+    field_schema2.repetition_level = rl;
+    field_schema2.definition_level = dl;
+    ColumnChunkReader<false, false> ccr_check(&reader, &cc, &field_schema2, 
nullptr, 0, nullptr,
+                                              ctx);
+    ASSERT_TRUE(ccr_check.init().ok());
+    // ASSERT_TRUE(ccr_check.next_page().ok());
+    ASSERT_TRUE(ccr_check.load_page_data().ok());
+    Slice s = ccr_check.get_page_data();
+    ASSERT_EQ(s.size, payload.size());
+    EXPECT_EQ(0, memcmp(s.data, payload.data(), payload.size()));
+    const Slice& rep = ccr_check.v2_rep_levels();
+    const Slice& def = ccr_check.v2_def_levels();
+    ASSERT_EQ(rep.size, rl);
+    ASSERT_EQ(def.size, dl);
+    // cached v2 page is stored decompressed (threshold=100), make sure 
counter reflects it
+    EXPECT_GT(ccr_check.statistics().page_cache_decompressed_hit_counter, 0);
+    EXPECT_EQ(0, memcmp(rep.data, level_bytes.data(), rl));
+    EXPECT_EQ(0, memcmp(def.data, level_bytes.data() + rl, dl));
+}
+
+TEST(ParquetPageCacheTest, MultiPagesMixedV1V2CacheHit) {
+    ParquetPageReadContext ctx;
+    ctx.enable_parquet_file_page_cache = true;
+    // Note: parquet_page_cache_decompress_threshold and 
enable_parquet_cache_compressed_pages
+    // are now BE config variables, not context fields
+
+    // Prepare a v1 uncompressed page and a v2 uncompressed page and insert 
both into cache
+    std::string path = "test_multi_pages_file";
+
+    // v1 page
+    tparquet::PageHeader hdr1;
+    hdr1.type = tparquet::PageType::DATA_PAGE;
+    hdr1.__set_compressed_page_size(4);
+    hdr1.__set_uncompressed_page_size(4);
+    hdr1.__isset.data_page_header = true;
+    hdr1.data_page_header.__set_num_values(1);
+    std::vector<uint8_t> header1_bytes;
+    ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+    ASSERT_TRUE(ts.serialize(&hdr1, &header1_bytes).ok());
+    std::vector<uint8_t> payload1 = {0x10, 0x20, 0x30, 0x40};
+    std::vector<uint8_t> cached1;
+    cached1.insert(cached1.end(), header1_bytes.begin(), header1_bytes.end());
+    cached1.insert(cached1.end(), payload1.begin(), payload1.end());
+
+    // v2 page
+    tparquet::PageHeader hdr2;
+    hdr2.type = tparquet::PageType::DATA_PAGE_V2;
+    int rl = 2;
+    int dl = 1;
+    int payload2_sz = 2;
+    hdr2.__set_compressed_page_size(rl + dl + payload2_sz);
+    hdr2.__set_uncompressed_page_size(rl + dl + payload2_sz);
+    hdr2.__isset.data_page_header_v2 = true;
+    hdr2.data_page_header_v2.__set_repetition_levels_byte_length(rl);
+    hdr2.data_page_header_v2.__set_definition_levels_byte_length(dl);
+    hdr2.data_page_header_v2.__set_is_compressed(false);
+    hdr2.data_page_header_v2.__set_num_values(1);
+    std::vector<uint8_t> header2_bytes;
+    ASSERT_TRUE(ts.serialize(&hdr2, &header2_bytes).ok());
+    std::vector<uint8_t> level_bytes = {0x11, 0x22, 0x33};
+    std::vector<uint8_t> payload2 = {0xAA, 0xBB};
+    std::vector<uint8_t> cached2;
+    cached2.insert(cached2.end(), header2_bytes.begin(), header2_bytes.end());
+    cached2.insert(cached2.end(), level_bytes.begin(), level_bytes.end());
+    cached2.insert(cached2.end(), payload2.begin(), payload2.end());
+
+    // Insert both pages into cache under different header offsets
+    size_t total1 = cached1.size();
+    auto* page1 = new DataPage(total1, true, segment_v2::DATA_PAGE);
+    memcpy(page1->data(), cached1.data(), total1);
+    page1->reset_size(total1);
+    PageCacheHandle h1;
+    size_t header1_start = 128;
+    int64_t mtime = 0;
+    StoragePageCache::CacheKey key1(fmt::format("{}::{}", path, mtime),
+                                    static_cast<size_t>(header1_start + 
total1), header1_start);
+    StoragePageCache::instance()->insert(key1, page1, &h1, 
segment_v2::DATA_PAGE);
+
+    size_t total2 = cached2.size();
+    auto* page2 = new DataPage(total2, true, segment_v2::DATA_PAGE);
+    memcpy(page2->data(), cached2.data(), total2);
+    page2->reset_size(total2);
+    PageCacheHandle h2;
+    size_t header2_start = 256;
+    StoragePageCache::CacheKey key2(fmt::format("{}::{}", path, mtime),
+                                    static_cast<size_t>(header2_start + 
total2), header2_start);
+    StoragePageCache::instance()->insert(key2, page2, &h2, 
segment_v2::DATA_PAGE);
+
+    // Now create readers that would lookup those cache keys
+    // Reader1 must expose header+page bytes at offset header1_start
+    std::vector<uint8_t> reader_backing1(3000, 0);
+    memcpy(reader_backing1.data() + header1_start, cached1.data(), total1);
+    FakeBufferedReader reader1(path, reader_backing1);
+    tparquet::ColumnChunk cc1;
+    cc1.meta_data.__set_data_page_offset(128);
+    cc1.meta_data.__set_total_compressed_size(total1);
+    cc1.meta_data.__set_num_values(1);
+    cc1.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED);
+    FieldSchema field_schema1;
+    field_schema1.repetition_level = 0;
+    field_schema1.definition_level = 0;
+    ColumnChunkReader<false, false> ccr1(&reader1, &cc1, &field_schema1, 
nullptr, 0, nullptr, ctx);
+    ASSERT_TRUE(ccr1.init().ok());
+    // ASSERT_TRUE(ccr1.next_page().ok());
+    ASSERT_TRUE(ccr1.load_page_data().ok());
+    Slice s1 = ccr1.get_page_data();
+    ASSERT_EQ(s1.size, payload1.size());
+    EXPECT_EQ(0, memcmp(s1.data, payload1.data(), payload1.size()));
+
+    std::vector<uint8_t> reader_backing2(3000, 0);
+    memcpy(reader_backing2.data() + header2_start, cached2.data(), total2);
+    FakeBufferedReader reader2(path, reader_backing2);
+    tparquet::ColumnChunk cc2;
+    cc2.meta_data.__set_data_page_offset(256);
+    cc2.meta_data.__set_total_compressed_size(total2);
+    cc2.meta_data.__set_num_values(1);
+    cc2.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED);
+    FieldSchema field_schema2;
+    field_schema2.repetition_level = rl;
+    field_schema2.definition_level = dl;
+    ColumnChunkReader<false, false> ccr2(&reader2, &cc2, &field_schema2, 
nullptr, 0, nullptr, ctx);
+    ASSERT_TRUE(ccr2.init().ok());
+    // ASSERT_TRUE(ccr2.next_page().ok());
+    ASSERT_TRUE(ccr2.load_page_data().ok());
+    Slice s2 = ccr2.get_page_data();
+    ASSERT_EQ(s2.size, payload2.size());
+    EXPECT_EQ(0, memcmp(s2.data, payload2.data(), payload2.size()));
+    const Slice& rep = ccr2.v2_rep_levels();
+    const Slice& def = ccr2.v2_def_levels();
+    ASSERT_EQ(rep.size, rl);
+    ASSERT_EQ(def.size, dl);
+    EXPECT_EQ(0, memcmp(rep.data, level_bytes.data(), rl));
+    EXPECT_EQ(0, memcmp(def.data, level_bytes.data() + rl, dl));
+}
+
+TEST(ParquetPageCacheTest, CacheMissThenHit) {
+    ParquetPageReadContext ctx;
+    ctx.enable_parquet_file_page_cache = true;
+    // Note: parquet_page_cache_decompress_threshold and 
enable_parquet_cache_compressed_pages
+    // are now BE config variables, not context fields
+
+    // uncompressed v1 page
+    tparquet::PageHeader header;
+    header.type = tparquet::PageType::DATA_PAGE;
+    header.__set_compressed_page_size(4);
+    header.__set_uncompressed_page_size(4);
+    header.__isset.data_page_header = true;
+    header.data_page_header.__set_num_values(1);
+    std::vector<uint8_t> header_bytes;
+    ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+    ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok());
+    std::vector<uint8_t> payload = {0xDE, 0xAD, 0xBE, 0xEF};
+    std::vector<uint8_t> backing(256, 0);
+    std::vector<uint8_t> cached;
+    cached.insert(cached.end(), header_bytes.begin(), header_bytes.end());
+    cached.insert(cached.end(), payload.begin(), payload.end());
+    int64_t header_offset = 64;
+    memcpy(backing.data() + header_offset, cached.data(), cached.size());
+
+    std::string path = "test_miss_then_hit";
+    FakeBufferedReader reader(path, backing);
+
+    tparquet::ColumnChunk cc;
+    cc.meta_data.__set_data_page_offset(header_offset);
+    cc.meta_data.__set_total_compressed_size(cached.size());
+    cc.meta_data.__set_num_values(1);
+    cc.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED);
+
+    FieldSchema fs;
+    fs.repetition_level = 0;
+    fs.definition_level = 0;
+
+    // First reader: should not hit cache, but should write cache
+    ColumnChunkReader<false, false> ccr(&reader, &cc, &fs, nullptr, 0, 
nullptr, ctx);
+    ASSERT_TRUE(ccr.init().ok());
+    // ASSERT_TRUE(ccr.next_page().ok());
+    ASSERT_TRUE(ccr.load_page_data().ok());
+    auto& statistics = ccr.statistics();
+    EXPECT_EQ(statistics.page_cache_hit_counter, 0);
+    EXPECT_EQ(statistics.page_cache_write_counter, 1);
+
+    // Second reader: should hit cache
+    ColumnChunkReader<false, false> ccr2(&reader, &cc, &fs, nullptr, 0, 
nullptr, ctx);
+    ASSERT_TRUE(ccr2.init().ok());
+    // ASSERT_TRUE(ccr2.next_page().ok());
+    ASSERT_TRUE(ccr2.load_page_data().ok());
+    auto& statistics2 = ccr2.statistics();
+    EXPECT_EQ(statistics2.page_cache_hit_counter, 1);
+    EXPECT_EQ(statistics2.page_cache_decompressed_hit_counter, 1);
+}
+
+TEST(ParquetPageCacheTest, DecompressThresholdCachesCompressed) {
+    ParquetPageReadContext ctx;
+    ctx.enable_parquet_file_page_cache = true;
+    // Note: enable_parquet_cache_compressed_pages is now a BE config 
variable, not a context field
+
+    // prepare a compressible payload (lots of zeros)
+    std::vector<uint8_t> payload(1024, 0);
+
+    // compress payload using snappy
+    BlockCompressionCodec* codec = nullptr;
+    
ASSERT_TRUE(get_block_compression_codec(segment_v2::CompressionTypePB::SNAPPY, 
&codec).ok());
+    faststring compressed_fast;
+    std::vector<Slice> inputs;
+    inputs.emplace_back(payload.data(), payload.size());
+    ASSERT_TRUE(codec->compress(inputs, payload.size(), 
&compressed_fast).ok());
+
+    tparquet::PageHeader header;
+    header.type = tparquet::PageType::DATA_PAGE;
+    
header.__set_compressed_page_size(static_cast<int32_t>(compressed_fast.size()));
+    header.__set_uncompressed_page_size(static_cast<int32_t>(payload.size()));
+    header.__isset.data_page_header = true;
+    header.data_page_header.__set_num_values(1);
+
+    std::vector<uint8_t> header_bytes;
+    ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+    ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok());
+
+    std::vector<uint8_t> file_data;
+    file_data.insert(file_data.end(), header_bytes.begin(), 
header_bytes.end());
+    file_data.insert(file_data.end(), compressed_fast.data(),
+                     compressed_fast.data() + compressed_fast.size());
+
+    std::string path = "test_threshold_file_compressed";
+    FakeBufferedReader reader(path, file_data);
+
+    tparquet::ColumnChunk cc;
+    cc.meta_data.__set_data_page_offset(0);
+    cc.meta_data.__set_total_compressed_size(file_data.size());
+    cc.meta_data.__set_num_values(1);
+    cc.meta_data.__set_codec(tparquet::CompressionCodec::SNAPPY);
+
+    FieldSchema fs;
+    fs.repetition_level = 0;
+    fs.definition_level = 0;
+
+    // Case: very small threshold -> cache the compressed payload (smaller 
footprint)
+    double old_thresh = config::parquet_page_cache_decompress_threshold;
+    bool old_enable_compressed = config::enable_parquet_cache_compressed_pages;
+    config::parquet_page_cache_decompress_threshold = 0.1;
+    config::enable_parquet_cache_compressed_pages = true;
+    ColumnChunkReader<false, false> ccr_small_thresh(&reader, &cc, &fs, 
nullptr, 0, nullptr, ctx);
+    ASSERT_TRUE(ccr_small_thresh.init().ok());
+    // ASSERT_TRUE(ccr_small_thresh.next_page().ok());
+    ASSERT_TRUE(ccr_small_thresh.load_page_data().ok());
+    EXPECT_EQ(ccr_small_thresh.statistics().page_cache_write_counter, 1);
+
+    // Inspect cache entry: payload stored should be compressed size
+    PageCacheHandle handle_small;
+    size_t file_end = header_bytes.size() + compressed_fast.size();
+    int64_t mtime = 0;
+    StoragePageCache::CacheKey key_small(fmt::format("{}::{}", path, mtime),
+                                         /*file_end_offset*/ file_end, 
/*header_start*/ 0);
+    bool found_small =
+            StoragePageCache::instance()->lookup(key_small, &handle_small, 
segment_v2::DATA_PAGE);
+    ASSERT_TRUE(found_small);
+    Slice cached_small = handle_small.data();
+    size_t header_size = header_bytes.size();
+    size_t payload_in_cache_size = cached_small.size - header_size; // no 
levels here
+    ASSERT_EQ(payload_in_cache_size, compressed_fast.size());
+
+    // restore config
+    config::parquet_page_cache_decompress_threshold = old_thresh;
+    config::enable_parquet_cache_compressed_pages = old_enable_compressed;
+}
+
+TEST(ParquetPageCacheTest, DecompressThresholdCachesDecompressed) {
+    ParquetPageReadContext ctx;
+    ctx.enable_parquet_file_page_cache = true;
+    // Note: enable_parquet_cache_compressed_pages is now a BE config 
variable, not a context field
+
+    // prepare a compressible payload (lots of zeros)
+    std::vector<uint8_t> payload(1024, 0);
+
+    // compress payload using snappy
+    BlockCompressionCodec* codec = nullptr;
+    
ASSERT_TRUE(get_block_compression_codec(segment_v2::CompressionTypePB::SNAPPY, 
&codec).ok());
+    faststring compressed_fast;
+    std::vector<Slice> inputs;
+    inputs.emplace_back(payload.data(), payload.size());
+    ASSERT_TRUE(codec->compress(inputs, payload.size(), 
&compressed_fast).ok());
+
+    tparquet::PageHeader header;
+    header.type = tparquet::PageType::DATA_PAGE;
+    
header.__set_compressed_page_size(static_cast<int32_t>(compressed_fast.size()));
+    header.__set_uncompressed_page_size(static_cast<int32_t>(payload.size()));
+    header.__isset.data_page_header = true;
+    header.data_page_header.__set_num_values(1);
+
+    std::vector<uint8_t> header_bytes;
+    ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+    ASSERT_TRUE(ts.serialize(&header, &header_bytes).ok());
+
+    std::vector<uint8_t> file_data;
+    file_data.insert(file_data.end(), header_bytes.begin(), 
header_bytes.end());
+    file_data.insert(file_data.end(), compressed_fast.data(),
+                     compressed_fast.data() + compressed_fast.size());
+
+    std::string path = "test_threshold_file_decompressed";
+    FakeBufferedReader reader(path, file_data);
+
+    tparquet::ColumnChunk cc;
+    cc.meta_data.__set_data_page_offset(0);
+    cc.meta_data.__set_total_compressed_size(file_data.size());
+    cc.meta_data.__set_num_values(1);
+    cc.meta_data.__set_codec(tparquet::CompressionCodec::SNAPPY);
+
+    FieldSchema fs;
+    fs.repetition_level = 0;
+    fs.definition_level = 0;
+
+    // Case: very large threshold -> cache decompressed payload
+    double old_thresh = config::parquet_page_cache_decompress_threshold;
+    bool old_enable_compressed = config::enable_parquet_cache_compressed_pages;
+    config::parquet_page_cache_decompress_threshold = 100.0;
+    config::enable_parquet_cache_compressed_pages = false;
+    ColumnChunkReader<false, false> ccr_large_thresh(&reader, &cc, &fs, 
nullptr, 0, nullptr, ctx);
+    ASSERT_TRUE(ccr_large_thresh.init().ok());
+    // ASSERT_TRUE(ccr_large_thresh.next_page().ok());
+    ASSERT_TRUE(ccr_large_thresh.load_page_data().ok());
+    EXPECT_EQ(ccr_large_thresh.statistics().page_cache_write_counter, 1);
+
+    // Inspect cache entry for large threshold: payload stored should be 
uncompressed size
+    PageCacheHandle handle_large;
+    size_t file_end = header_bytes.size() + compressed_fast.size();
+    int64_t mtime = 0;
+    StoragePageCache::CacheKey key_large(fmt::format("{}::{}", path, mtime),
+                                         /*file_end_offset*/ file_end, 
/*header_start*/ 0);
+    bool found_large =
+            StoragePageCache::instance()->lookup(key_large, &handle_large, 
segment_v2::DATA_PAGE);
+    ASSERT_TRUE(found_large);
+    Slice cached_large = handle_large.data();
+    size_t payload_in_cache_size_large = cached_large.size - 
header_bytes.size();
+    ASSERT_EQ(payload_in_cache_size_large, payload.size());
+
+    // Verify cache hit for a new reader (should hit the decompressed entry we 
just created)
+    ColumnChunkReader<false, false> ccr_check(&reader, &cc, &fs, nullptr, 0, 
nullptr, ctx);
+    ASSERT_TRUE(ccr_check.init().ok());
+    // ASSERT_TRUE(ccr_check.next_page().ok());
+    ASSERT_TRUE(ccr_check.load_page_data().ok());
+    EXPECT_EQ(ccr_check.statistics().page_cache_hit_counter, 1);
+    // restore config
+    config::parquet_page_cache_decompress_threshold = old_thresh;
+    config::enable_parquet_cache_compressed_pages = old_enable_compressed;
+}
+
+TEST(ParquetPageCacheTest, MultipleReadersShareCachedEntry) {
+    ParquetPageReadContext ctx;
+    ctx.enable_parquet_file_page_cache = true;
+    double old_thresh = config::parquet_page_cache_decompress_threshold;
+    bool old_enable_compressed = config::enable_parquet_cache_compressed_pages;
+    config::parquet_page_cache_decompress_threshold = 100.0;
+    config::enable_parquet_cache_compressed_pages = false;
+
+    // Create a v2 cached page and then instantiate multiple readers that hit 
the cache
+    std::string path = "test_shared_handles";
+    tparquet::PageHeader hdr;
+    hdr.type = tparquet::PageType::DATA_PAGE_V2;
+    int rl = 2;
+    int dl = 1;
+    hdr.__isset.data_page_header_v2 = true;
+    hdr.data_page_header_v2.__set_repetition_levels_byte_length(rl);
+    hdr.data_page_header_v2.__set_definition_levels_byte_length(dl);
+    hdr.data_page_header_v2.__set_is_compressed(false);
+    hdr.data_page_header_v2.__set_num_values(1);
+    std::vector<uint8_t> header_bytes;
+    ThriftSerializer ts(/*compact*/ true, /*initial*/ 256);
+    ASSERT_TRUE(ts.serialize(&hdr, &header_bytes).ok());
+    std::vector<uint8_t> level_bytes = {0x11, 0x22, 0x33};
+    std::vector<uint8_t> payload = {0x0A, 0x0B};
+    std::vector<uint8_t> cached;
+    cached.insert(cached.end(), header_bytes.begin(), header_bytes.end());
+    cached.insert(cached.end(), level_bytes.begin(), level_bytes.end());
+    cached.insert(cached.end(), payload.begin(), payload.end());
+
+    size_t total = cached.size();
+    auto* page = new DataPage(total, true, segment_v2::DATA_PAGE);
+    memcpy(page->data(), cached.data(), total);
+    page->reset_size(total);
+    PageCacheHandle handle;
+    size_t header_start = 512;
+    int64_t mtime = 0;
+    StoragePageCache::CacheKey key(fmt::format("{}::{}", path, mtime),
+                                   static_cast<size_t>(header_start + total), 
header_start);
+    StoragePageCache::instance()->insert(key, page, &handle, 
segment_v2::DATA_PAGE);
+
+    // Create multiple readers that will hit cache
+    const int N = 4;
+    for (int i = 0; i < N; ++i) {
+        std::vector<uint8_t> reader_backing(5000, 0);
+        memcpy(reader_backing.data() + header_start, cached.data(), total);
+        FakeBufferedReader reader(path, reader_backing);
+        tparquet::ColumnChunk cc;
+        cc.meta_data.__set_data_page_offset(512);
+        cc.meta_data.__set_total_compressed_size(total);
+        cc.meta_data.__set_num_values(1);
+        cc.meta_data.__set_codec(tparquet::CompressionCodec::UNCOMPRESSED);
+        FieldSchema fs;
+        fs.repetition_level = rl;
+        fs.definition_level = dl;
+        ColumnChunkReader<false, false> ccr(&reader, &cc, &fs, nullptr, 0, 
nullptr, ctx);
+        ASSERT_TRUE(ccr.init().ok());
+        // ASSERT_TRUE(ccr.next_page().ok());
+        ASSERT_TRUE(ccr.load_page_data().ok());
+        Slice s = ccr.get_page_data();
+        ASSERT_EQ(s.size, payload.size());
+        EXPECT_EQ(0, memcmp(s.data, payload.data(), payload.size()));
+        const Slice& rep = ccr.v2_rep_levels();
+        const Slice& def = ccr.v2_def_levels();
+        ASSERT_EQ(rep.size, rl);
+        ASSERT_EQ(def.size, dl);
+    }
+    // restore config
+    config::parquet_page_cache_decompress_threshold = old_thresh;
+    config::enable_parquet_cache_compressed_pages = old_enable_compressed;
+}
diff --git a/be/test/vec/exec/orc/orc_file_reader_test.cpp 
b/be/test/vec/exec/orc/orc_file_reader_test.cpp
index 9e1003c397f..4c71129cdbb 100644
--- a/be/test/vec/exec/orc/orc_file_reader_test.cpp
+++ b/be/test/vec/exec/orc/orc_file_reader_test.cpp
@@ -41,6 +41,8 @@ public:
 
     bool closed() const override { return _closed; }
 
+    int64_t mtime() const override { return 0; }
+
     void set_data(const std::string& data) { _data = data; }
 
 protected:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 361fd904ea8..b33deddb695 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -496,6 +496,9 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String SHOW_USER_DEFAULT_ROLE = 
"show_user_default_role";
 
     public static final String ENABLE_PAGE_CACHE = "enable_page_cache";
+    public static final String ENABLE_PARQUET_FILE_PAGE_CACHE = 
"enable_parquet_file_page_cache";
+    // public static final String PARQUET_PAGE_CACHE_DECOMPRESS_THRESHOLD = 
"parquet_page_cache_decompress_threshold";
+    // public static final String ENABLE_PARQUET_CACHE_COMPRESSED_PAGES = 
"enable_parquet_cache_compressed_pages";
 
     public static final String MINIDUMP_PATH = "minidump_path";
 
@@ -2154,6 +2157,27 @@ public class SessionVariable implements Serializable, 
Writable {
             needForward = true)
     public boolean enablePageCache = true;
 
+    @VariableMgr.VarAttr(
+            name = ENABLE_PARQUET_FILE_PAGE_CACHE,
+            description = {"控制是否启用 Parquet file page cache。默认为 true。",
+                    "Controls whether to use Parquet file page cache. The 
default is true."},
+            needForward = true)
+    public boolean enableParquetFilePageCache = true;
+
+    // @VariableMgr.VarAttr(
+    //         name = PARQUET_PAGE_CACHE_DECOMPRESS_THRESHOLD,
+    //         description = {"决定是否缓存解压后 page 的阈值,默认 1.5。",
+    //                 "Threshold ratio to decide caching decompressed parquet 
page, default 1.5."},
+    //         needForward = true)
+    // public double parquetPageCacheDecompressThreshold = 1.5;
+
+    // @VariableMgr.VarAttr(
+    //         name = ENABLE_PARQUET_CACHE_COMPRESSED_PAGES,
+    //         description = {"控制是否缓存压缩的 Parquet 页面,默认为 false",
+    //                 "Controls whether to cache compressed parquet pages. 
Default false."},
+    //         needForward = true)
+    // public boolean enableParquetCacheCompressedPages = false;
+
     @VariableMgr.VarAttr(name = ENABLE_FOLD_NONDETERMINISTIC_FN)
     public boolean enableFoldNondeterministicFn = false;
 
@@ -4876,6 +4900,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
         tResult.setEnablePageCache(enablePageCache);
 
+        tResult.setEnableParquetFilePageCache(enableParquetFilePageCache);
+
         tResult.setFileCacheBasePath(fileCacheBasePath);
 
         tResult.setEnableInvertedIndexQuery(enableInvertedIndexQuery);
@@ -4889,6 +4915,7 @@ public class SessionVariable implements Serializable, 
Writable {
         tResult.setEnableParquetLazyMat(enableParquetLazyMat);
         tResult.setEnableOrcLazyMat(enableOrcLazyMat);
         tResult.setEnableParquetFilterByMinMax(enableParquetFilterByMinMax);
+        tResult.setEnableParquetFilePageCache(enableParquetFilePageCache);
         tResult.setEnableOrcFilterByMinMax(enableOrcFilterByMinMax);
         tResult.setCheckOrcInitSargsSuccess(checkOrcInitSargsSuccess);
 
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 88f063988c9..67c64195b34 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -418,6 +418,10 @@ struct TQueryOptions {
   180: optional i32 max_file_scanners_concurrency = 0;
   181: optional i32 min_file_scanners_concurrency = 0;
 
+  // Parquet page cache session options
+  // Whether to enable parquet file page cache on BE for this query
+  184: optional bool enable_parquet_file_page_cache = true;
+
   // For cloud, to control if the content would be written into file cache
   // In write path, to control if the content would be written into file cache.
   // In read path, read from file cache or remote storage when execute query.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to