This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 495f3894508 [fix](memory) Fix BlockCompression memory tracking #33841
495f3894508 is described below

commit 495f3894508621b2e655c0da62b6bfbdebf5a51e
Author: Xinyi Zou <[email protected]>
AuthorDate: Sat Apr 20 00:44:11 2024 +0800

    [fix](memory) Fix BlockCompression memory tracking #33841
---
 be/src/runtime/exec_env.h         |  6 +++++
 be/src/runtime/exec_env_init.cpp  |  2 ++
 be/src/util/block_compression.cpp | 54 ++++++++++++++++++++++++++++++++++-----
 3 files changed, 56 insertions(+), 6 deletions(-)

diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 5d03bca2bb9..6b1e3fb476a 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -181,6 +181,9 @@ public:
     std::shared_ptr<MemTrackerLimiter> point_query_executor_mem_tracker() {
         return _point_query_executor_mem_tracker;
     }
+    std::shared_ptr<MemTrackerLimiter> block_compression_mem_tracker() {
+        return _block_compression_mem_tracker;
+    }
     std::shared_ptr<MemTrackerLimiter> rowid_storage_reader_tracker() {
         return _rowid_storage_reader_tracker;
     }
@@ -351,7 +354,10 @@ private:
     std::shared_ptr<MemTracker> _brpc_iobuf_block_memory_tracker;
     // Count the memory consumption of segment compaction tasks.
     std::shared_ptr<MemTrackerLimiter> _segcompaction_mem_tracker;
+
+    // Tracking memory may be shared between multiple queries.
     std::shared_ptr<MemTrackerLimiter> _point_query_executor_mem_tracker;
+    std::shared_ptr<MemTrackerLimiter> _block_compression_mem_tracker;
 
     // TODO, looking forward to more accurate tracking.
     std::shared_ptr<MemTrackerLimiter> _rowid_storage_reader_tracker;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index d90a92038d1..2c63b9cf3ca 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -540,6 +540,8 @@ void ExecEnv::init_mem_tracker() {
             MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, 
"SegCompaction");
     _point_query_executor_mem_tracker =
             MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, 
"PointQueryExecutor");
+    _block_compression_mem_tracker =
+            MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, 
"BlockCompression");
     _rowid_storage_reader_tracker =
             MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, 
"RowIdStorageReader");
     _subcolumns_tree_tracker =
diff --git a/be/src/util/block_compression.cpp 
b/be/src/util/block_compression.cpp
index d5d7fb5ae59..f3b1e781e7e 100644
--- a/be/src/util/block_compression.cpp
+++ b/be/src/util/block_compression.cpp
@@ -111,9 +111,15 @@ public:
         static Lz4BlockCompression s_instance;
         return &s_instance;
     }
-    ~Lz4BlockCompression() { _ctx_pool.clear(); }
+    ~Lz4BlockCompression() {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                ExecEnv::GetInstance()->block_compression_mem_tracker());
+        _ctx_pool.clear();
+    }
 
     Status compress(const Slice& input, faststring* output) override {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                ExecEnv::GetInstance()->block_compression_mem_tracker());
         if (input.size > INT_MAX) {
             return Status::InvalidArgument(
                     "LZ4 not support those case(input.size>INT_MAX), maybe you 
should change "
@@ -159,6 +165,8 @@ public:
     }
 
     Status decompress(const Slice& input, Slice* output) override {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                ExecEnv::GetInstance()->block_compression_mem_tracker());
         auto decompressed_len =
                 LZ4_decompress_safe(input.data, output->data, input.size, 
output->size);
         if (decompressed_len < 0) {
@@ -210,6 +218,8 @@ public:
         return &s_instance;
     }
     Status decompress(const Slice& input, Slice* output) override {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                ExecEnv::GetInstance()->block_compression_mem_tracker());
         
RETURN_IF_ERROR(Decompressor::create_decompressor(CompressType::LZ4BLOCK, 
&_decompressor));
         size_t input_bytes_read = 0;
         size_t decompressed_len = 0;
@@ -263,6 +273,8 @@ public:
         return &s_instance;
     }
     ~Lz4fBlockCompression() {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                ExecEnv::GetInstance()->block_compression_mem_tracker());
         _ctx_c_pool.clear();
         _ctx_d_pool.clear();
     }
@@ -289,6 +301,8 @@ public:
 private:
     Status _compress(const std::vector<Slice>& inputs, size_t 
uncompressed_size,
                      faststring* output) {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                ExecEnv::GetInstance()->block_compression_mem_tracker());
         std::unique_ptr<CContext> context;
         RETURN_IF_ERROR(_acquire_compression_ctx(context));
         bool compress_failed = false;
@@ -347,6 +361,8 @@ private:
     }
 
     Status _decompress(const Slice& input, Slice* output) {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                ExecEnv::GetInstance()->block_compression_mem_tracker());
         bool decompress_failed = false;
         std::unique_ptr<DContext> context;
         RETURN_IF_ERROR(_acquire_decompression_ctx(context));
@@ -471,9 +487,15 @@ public:
         static Lz4HCBlockCompression s_instance;
         return &s_instance;
     }
-    ~Lz4HCBlockCompression() { _ctx_pool.clear(); }
+    ~Lz4HCBlockCompression() {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                ExecEnv::GetInstance()->block_compression_mem_tracker());
+        _ctx_pool.clear();
+    }
 
     Status compress(const Slice& input, faststring* output) override {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                ExecEnv::GetInstance()->block_compression_mem_tracker());
         std::unique_ptr<Context> context;
         RETURN_IF_ERROR(_acquire_compression_ctx(context));
         bool compress_failed = false;
@@ -511,6 +533,8 @@ public:
     }
 
     Status decompress(const Slice& input, Slice* output) override {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                ExecEnv::GetInstance()->block_compression_mem_tracker());
         auto decompressed_len =
                 LZ4_decompress_safe(input.data, output->data, input.size, 
output->size);
         if (decompressed_len < 0) {
@@ -630,6 +654,8 @@ public:
     ~SnappyBlockCompression() override {}
 
     Status compress(const Slice& input, faststring* output) override {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                ExecEnv::GetInstance()->block_compression_mem_tracker());
         size_t max_len = max_compressed_len(input.size);
         output->resize(max_len);
         Slice s(*output);
@@ -640,6 +666,8 @@ public:
     }
 
     Status decompress(const Slice& input, Slice* output) override {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                ExecEnv::GetInstance()->block_compression_mem_tracker());
         if (!snappy::RawUncompress(input.data, input.size, output->data)) {
             return Status::InvalidArgument("Fail to do Snappy decompress");
         }
@@ -671,6 +699,8 @@ public:
     ~ZlibBlockCompression() {}
 
     Status compress(const Slice& input, faststring* output) override {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                ExecEnv::GetInstance()->block_compression_mem_tracker());
         size_t max_len = max_compressed_len(input.size);
         output->resize(max_len);
         Slice s(*output);
@@ -685,6 +715,8 @@ public:
 
     Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
                     faststring* output) override {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                ExecEnv::GetInstance()->block_compression_mem_tracker());
         size_t max_len = max_compressed_len(uncompressed_size);
         output->resize(max_len);
 
@@ -725,6 +757,8 @@ public:
     }
 
     Status decompress(const Slice& input, Slice* output) override {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                ExecEnv::GetInstance()->block_compression_mem_tracker());
         size_t input_size = input.size;
         auto zres =
                 ::uncompress2((Bytef*)output->data, &output->size, 
(Bytef*)input.data, &input_size);
@@ -775,7 +809,8 @@ public:
         return &s_instance;
     }
     ~ZstdBlockCompression() {
-        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context);
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                ExecEnv::GetInstance()->block_compression_mem_tracker());
         _ctx_c_pool.clear();
         _ctx_d_pool.clear();
     }
@@ -791,7 +826,8 @@ public:
     //  
https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c
     Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
                     faststring* output) override {
-        _query_thread_context.init();
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                ExecEnv::GetInstance()->block_compression_mem_tracker());
         std::unique_ptr<CContext> context;
         RETURN_IF_ERROR(_acquire_compression_ctx(context));
         bool compress_failed = false;
@@ -868,7 +904,8 @@ public:
     }
 
     Status decompress(const Slice& input, Slice* output) override {
-        _query_thread_context.init();
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                ExecEnv::GetInstance()->block_compression_mem_tracker());
         std::unique_ptr<DContext> context;
         bool decompress_failed = false;
         RETURN_IF_ERROR(_acquire_decompression_ctx(context));
@@ -953,7 +990,6 @@ private:
 
     mutable std::mutex _ctx_d_mutex;
     mutable std::vector<std::unique_ptr<DContext>> _ctx_d_pool;
-    QueryThreadContext _query_thread_context;
 };
 
 class GzipBlockCompression : public ZlibBlockCompression {
@@ -965,6 +1001,8 @@ public:
     ~GzipBlockCompression() override = default;
 
     Status decompress(const Slice& input, Slice* output) override {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                ExecEnv::GetInstance()->block_compression_mem_tracker());
         z_stream z_strm = {};
         z_strm.zalloc = Z_NULL;
         z_strm.zfree = Z_NULL;
@@ -1046,6 +1084,8 @@ public:
     ~GzipBlockCompressionByLibdeflate() override = default;
 
     Status decompress(const Slice& input, Slice* output) override {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                ExecEnv::GetInstance()->block_compression_mem_tracker());
         if (input.empty()) {
             output->size = 0;
             return Status::OK();
@@ -1078,6 +1118,8 @@ public:
     }
     size_t max_compressed_len(size_t len) override { return 0; };
     Status decompress(const Slice& input, Slice* output) override {
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                ExecEnv::GetInstance()->block_compression_mem_tracker());
         auto* input_ptr = input.data;
         auto remain_input_size = input.size;
         auto* output_ptr = output->data;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to