This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new af0bca698ad [Branch-2.1](Parquet) add a memory tracker to parquet meta
(#50130)
af0bca698ad is described below
commit af0bca698ad7c45407546ab678ad7358a97c568c
Author: Tiewei Fang <[email protected]>
AuthorDate: Wed Apr 23 11:58:58 2025 +0800
[Branch-2.1](Parquet) add a memory tracker to parquet meta (#50130)
---
be/src/runtime/exec_env.h | 4 ++++
be/src/runtime/exec_env_init.cpp | 2 ++
be/src/vec/exec/format/parquet/parquet_thrift_util.h | 2 +-
be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp | 10 +++++++++-
be/src/vec/exec/format/parquet/vparquet_file_metadata.h | 6 ++++--
5 files changed, 20 insertions(+), 4 deletions(-)
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index a6f84e1db8f..6e97e78768b 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -203,6 +203,7 @@ public:
return _subcolumns_tree_tracker;
}
std::shared_ptr<MemTrackerLimiter> s3_file_buffer_tracker() { return
_s3_file_buffer_tracker; }
+ std::shared_ptr<MemTrackerLimiter> parquet_meta_tracker() { return
_parquet_meta_tracker; }
ThreadPool* send_batch_thread_pool() { return
_send_batch_thread_pool.get(); }
ThreadPool* buffered_reader_prefetch_thread_pool() {
@@ -389,6 +390,9 @@ private:
std::shared_ptr<MemTrackerLimiter> _subcolumns_tree_tracker;
std::shared_ptr<MemTrackerLimiter> _s3_file_buffer_tracker;
+ // Tracking memory consumption of parquet meta
+ std::shared_ptr<MemTrackerLimiter> _parquet_meta_tracker;
+
std::unique_ptr<ThreadPool> _send_batch_thread_pool;
// Threadpool used to prefetch remote file for buffered reader
std::unique_ptr<ThreadPool> _buffered_reader_prefetch_thread_pool;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 1f4678d7ba7..d1d26d64600 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -548,6 +548,8 @@ void ExecEnv::init_mem_tracker() {
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL,
"S3FileBuffer");
_stream_load_pipe_tracker =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::LOAD,
"StreamLoadPipe");
+ _parquet_meta_tracker =
+ MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::QUERY,
"ParquetMeta");
}
void ExecEnv::_register_metrics() {
diff --git a/be/src/vec/exec/format/parquet/parquet_thrift_util.h
b/be/src/vec/exec/format/parquet/parquet_thrift_util.h
index 64ccda6fe2e..ef22036366e 100644
--- a/be/src/vec/exec/format/parquet/parquet_thrift_util.h
+++ b/be/src/vec/exec/format/parquet/parquet_thrift_util.h
@@ -75,7 +75,7 @@ static Status parse_thrift_footer(io::FileReaderSPtr file,
FileMetaData** file_m
tparquet::FileMetaData t_metadata;
// deserialize footer
RETURN_IF_ERROR(deserialize_thrift_msg(meta_ptr, &metadata_size, true,
&t_metadata));
- *file_metadata = new FileMetaData(t_metadata);
+ *file_metadata = new FileMetaData(t_metadata, metadata_size);
RETURN_IF_ERROR((*file_metadata)->init_schema());
*meta_size = PARQUET_FOOTER_SIZE + metadata_size;
return Status::OK();
diff --git a/be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp
b/be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp
index 7a164c306c2..c55b3b953e9 100644
--- a/be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp
@@ -22,11 +22,19 @@
#include <sstream>
#include <vector>
+#include "runtime/exec_env.h"
+#include "runtime/memory/mem_tracker_limiter.h"
#include "schema_desc.h"
namespace doris::vectorized {
+FileMetaData::FileMetaData(tparquet::FileMetaData& metadata, size_t mem_size)
+ : _metadata(metadata), _mem_size(mem_size) {
+ ExecEnv::GetInstance()->parquet_meta_tracker()->consume(mem_size);
+}
-FileMetaData::FileMetaData(tparquet::FileMetaData& metadata) :
_metadata(metadata) {}
+FileMetaData::~FileMetaData() {
+ ExecEnv::GetInstance()->parquet_meta_tracker()->release(_mem_size);
+}
Status FileMetaData::init_schema() {
if (_metadata.schema[0].num_children <= 0) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_file_metadata.h
b/be/src/vec/exec/format/parquet/vparquet_file_metadata.h
index 5d745a0db62..d1ebb06957d 100644
--- a/be/src/vec/exec/format/parquet/vparquet_file_metadata.h
+++ b/be/src/vec/exec/format/parquet/vparquet_file_metadata.h
@@ -27,8 +27,8 @@ namespace doris::vectorized {
class FileMetaData {
public:
- FileMetaData(tparquet::FileMetaData& metadata);
- ~FileMetaData() = default;
+ FileMetaData(tparquet::FileMetaData& metadata, size_t mem_size);
+ ~FileMetaData();
Status init_schema();
const FieldDescriptor& schema() const { return _schema; }
const tparquet::FileMetaData& to_thrift();
@@ -36,10 +36,12 @@ public:
_schema.iceberg_sanitize(read_columns);
}
std::string debug_string() const;
+ size_t get_mem_size() const { return _mem_size; }
private:
tparquet::FileMetaData _metadata;
FieldDescriptor _schema;
+ size_t _mem_size;
};
} // namespace doris::vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]