This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 2581501d60c [Branch3.0](Parquet) add a memory tracker to parquet meta 
(#50468)
2581501d60c is described below

commit 2581501d60ca6e5681f9bbb70b43f671637c5470
Author: Tiewei Fang <[email protected]>
AuthorDate: Tue May 6 09:57:51 2025 +0800

    [Branch3.0](Parquet) add a memory tracker to parquet meta (#50468)
    
    picked from: #49037
---
 be/src/runtime/exec_env.h                                 |  4 ++++
 be/src/runtime/exec_env_init.cpp                          |  2 ++
 be/src/vec/exec/format/parquet/parquet_thrift_util.h      |  2 +-
 be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp | 11 ++++++++++-
 be/src/vec/exec/format/parquet/vparquet_file_metadata.h   |  6 ++++--
 5 files changed, 21 insertions(+), 4 deletions(-)

diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 35b0c842d70..44cda3f81de 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -214,6 +214,7 @@ public:
         return _subcolumns_tree_tracker;
     }
     std::shared_ptr<MemTrackerLimiter> s3_file_buffer_tracker() { return 
_s3_file_buffer_tracker; }
+    std::shared_ptr<MemTrackerLimiter> parquet_meta_tracker() { return 
_parquet_meta_tracker; }
 
     ThreadPool* send_batch_thread_pool() { return 
_send_batch_thread_pool.get(); }
     ThreadPool* buffered_reader_prefetch_thread_pool() {
@@ -409,6 +410,9 @@ private:
     std::shared_ptr<MemTrackerLimiter> _subcolumns_tree_tracker;
     std::shared_ptr<MemTrackerLimiter> _s3_file_buffer_tracker;
 
+    // Tracking memory consumption of parquet meta
+    std::shared_ptr<MemTrackerLimiter> _parquet_meta_tracker;
+
     std::unique_ptr<ThreadPool> _send_batch_thread_pool;
     // Threadpool used to prefetch remote file for buffered reader
     std::unique_ptr<ThreadPool> _buffered_reader_prefetch_thread_pool;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index a34848ffd9f..393d3fa3d19 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -598,6 +598,8 @@ void ExecEnv::init_mem_tracker() {
             MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, 
"S3FileBuffer");
     _stream_load_pipe_tracker =
             MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::LOAD, 
"StreamLoadPipe");
+    _parquet_meta_tracker =
+            
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::METADATA, 
"ParquetMeta");
 }
 
 Status ExecEnv::_check_deploy_mode() {
diff --git a/be/src/vec/exec/format/parquet/parquet_thrift_util.h 
b/be/src/vec/exec/format/parquet/parquet_thrift_util.h
index b767f177f4a..f6426a64495 100644
--- a/be/src/vec/exec/format/parquet/parquet_thrift_util.h
+++ b/be/src/vec/exec/format/parquet/parquet_thrift_util.h
@@ -75,7 +75,7 @@ static Status parse_thrift_footer(io::FileReaderSPtr file, 
FileMetaData** file_m
     tparquet::FileMetaData t_metadata;
     // deserialize footer
     RETURN_IF_ERROR(deserialize_thrift_msg(meta_ptr, &metadata_size, true, 
&t_metadata));
-    *file_metadata = new FileMetaData(t_metadata);
+    *file_metadata = new FileMetaData(t_metadata, metadata_size);
     RETURN_IF_ERROR((*file_metadata)->init_schema());
     *meta_size = PARQUET_FOOTER_SIZE + metadata_size;
     return Status::OK();
diff --git a/be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp 
b/be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp
index 7a164c306c2..98de497e320 100644
--- a/be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp
@@ -22,11 +22,20 @@
 #include <sstream>
 #include <vector>
 
+#include "runtime/exec_env.h"
+#include "runtime/memory/mem_tracker_limiter.h"
 #include "schema_desc.h"
 
 namespace doris::vectorized {
 
-FileMetaData::FileMetaData(tparquet::FileMetaData& metadata) : 
_metadata(metadata) {}
+FileMetaData::FileMetaData(tparquet::FileMetaData& metadata, size_t mem_size)
+        : _metadata(metadata), _mem_size(mem_size) {
+    ExecEnv::GetInstance()->parquet_meta_tracker()->consume(mem_size);
+}
+
+FileMetaData::~FileMetaData() {
+    ExecEnv::GetInstance()->parquet_meta_tracker()->release(_mem_size);
+}
 
 Status FileMetaData::init_schema() {
     if (_metadata.schema[0].num_children <= 0) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_file_metadata.h 
b/be/src/vec/exec/format/parquet/vparquet_file_metadata.h
index 5d745a0db62..d1ebb06957d 100644
--- a/be/src/vec/exec/format/parquet/vparquet_file_metadata.h
+++ b/be/src/vec/exec/format/parquet/vparquet_file_metadata.h
@@ -27,8 +27,8 @@ namespace doris::vectorized {
 
 class FileMetaData {
 public:
-    FileMetaData(tparquet::FileMetaData& metadata);
-    ~FileMetaData() = default;
+    FileMetaData(tparquet::FileMetaData& metadata, size_t mem_size);
+    ~FileMetaData();
     Status init_schema();
     const FieldDescriptor& schema() const { return _schema; }
     const tparquet::FileMetaData& to_thrift();
@@ -36,10 +36,12 @@ public:
         _schema.iceberg_sanitize(read_columns);
     }
     std::string debug_string() const;
+    size_t get_mem_size() const { return _mem_size; }
 
 private:
     tparquet::FileMetaData _metadata;
     FieldDescriptor _schema;
+    size_t _mem_size;
 };
 
 } // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to