This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 387e33fa347 [enhancement](group commit)Add group commit block queues 
memory back pressure (#26045)
387e33fa347 is described below

commit 387e33fa3476dc43c39176c44c87744dfc5fc415
Author: abmdocrt <[email protected]>
AuthorDate: Wed Nov 1 16:29:45 2023 +0800

    [enhancement](group commit)Add group commit block queues memory back 
pressure (#26045)
---
 be/src/common/config.cpp            |  3 +++
 be/src/common/config.h              |  3 +++
 be/src/runtime/group_commit_mgr.cpp | 34 +++++++++++++++++++++++++++-------
 be/src/runtime/group_commit_mgr.h   | 31 +++++++++++++++++++++++++------
 4 files changed, 58 insertions(+), 13 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index deb99682fed..fae3369b36c 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1116,6 +1116,9 @@ DEFINE_Bool(ignore_always_true_predicate_for_segment, 
"true");
 // Dir of default timezone files
 DEFINE_String(default_tzfiles_path, "${DORIS_HOME}/zoneinfo");
 
+// Max size(bytes) of group commit queues, used for mem back pressure.
+DEFINE_Int32(group_commit_max_queue_size, "65536");
+
 // clang-format off
 #ifdef BE_TEST
 // test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 9855ba0ffec..393e7de3492 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1186,6 +1186,9 @@ DECLARE_Bool(ignore_always_true_predicate_for_segment);
 // Dir of default timezone files
 DECLARE_String(default_tzfiles_path);
 
+// Max size(bytes) of group commit queues, used for mem back pressure.
+DECLARE_Int32(group_commit_max_queue_size);
+
 #ifdef BE_TEST
 // test s3
 DECLARE_String(test_s3_resource);
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 1bdb5b586f0..718d7f19aa4 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -23,7 +23,11 @@
 #include <gen_cpp/PaloInternalService_types.h>
 #include <gen_cpp/Types_types.h>
 
+#include <memory>
+#include <numeric>
+
 #include "client_cache.h"
+#include "common/config.h"
 #include "common/object_pool.h"
 #include "exec/data_sink.h"
 #include "io/fs/stream_load_pipe.h"
@@ -34,6 +38,7 @@
 #include "runtime/stream_load/new_load_stream_mgr.h"
 #include "runtime/stream_load/stream_load_context.h"
 #include "util/thrift_rpc_helper.h"
+#include "vec/core/future_block.h"
 #include "vec/exec/scan/new_file_scan_node.h"
 #include "vec/sink/group_commit_block_sink.h"
 
@@ -45,10 +50,16 @@ Status 
LoadBlockQueue::add_block(std::shared_ptr<vectorized::FutureBlock> block)
     DCHECK(block->get_schema_version() == schema_version);
     std::unique_lock l(*_mutex);
     RETURN_IF_ERROR(_status);
+    while (*_all_block_queues_bytes > config::group_commit_max_queue_size) {
+        _put_cond.wait_for(
+                l, 
std::chrono::milliseconds(LoadBlockQueue::MAX_BLOCK_QUEUE_ADD_WAIT_TIME));
+    }
     if (block->rows() > 0) {
         _block_queue.push_back(block);
+        *_all_block_queues_bytes += block->bytes();
+        *_single_block_queue_bytes += block->bytes();
     }
-    _cv->notify_one();
+    _get_cond.notify_all();
     return Status::OK();
 }
 
@@ -67,6 +78,7 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, 
bool* find_block, boo
     }
     while (_status.ok() && _block_queue.empty() &&
            (!need_commit || (need_commit && !_load_ids.empty()))) {
+        CHECK(*_single_block_queue_bytes == 0);
         auto left_milliseconds = config::group_commit_interval_ms;
         if (!need_commit) {
             left_milliseconds = config::group_commit_interval_ms -
@@ -79,9 +91,9 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, 
bool* find_block, boo
             }
         }
 #if !defined(USE_BTHREAD_SCANNER)
-        _cv->wait_for(l, std::chrono::milliseconds(left_milliseconds));
+        _get_cond.wait_for(l, std::chrono::milliseconds(left_milliseconds));
 #else
-        _cv->wait_for(l, left_milliseconds * 1000);
+        _get_cond.wait_for(l, left_milliseconds * 1000);
 #endif
     }
     if (!_block_queue.empty()) {
@@ -90,12 +102,16 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, 
bool* find_block, boo
         fblock->swap_future_block(future_block);
         *find_block = true;
         _block_queue.pop_front();
+        *_all_block_queues_bytes -= fblock->bytes();
+        *_single_block_queue_bytes -= block->bytes();
     }
     if (_block_queue.empty() && need_commit && _load_ids.empty()) {
+        CHECK(*_single_block_queue_bytes == 0);
         *eos = true;
     } else {
         *eos = false;
     }
+    _put_cond.notify_all();
     return Status::OK();
 }
 
@@ -103,7 +119,7 @@ void LoadBlockQueue::remove_load_id(const UniqueId& 
load_id) {
     std::unique_lock l(*_mutex);
     if (_load_ids.find(load_id) != _load_ids.end()) {
         _load_ids.erase(load_id);
-        _cv->notify_one();
+        _get_cond.notify_all();
     }
 }
 
@@ -126,6 +142,8 @@ void LoadBlockQueue::cancel(const Status& st) {
             auto& future_block = _block_queue.front();
             std::unique_lock<doris::Mutex> l0(*(future_block->lock));
             future_block->set_result(st, future_block->rows(), 0);
+            *_all_block_queues_bytes -= future_block->bytes();
+            *_single_block_queue_bytes -= future_block->bytes();
             future_block->cv->notify_all();
         }
         _block_queue.pop_front();
@@ -248,8 +266,8 @@ Status GroupCommitTable::_create_group_commit_load(
                << ", txn_id=" << txn_id << ", instance_id=" << 
print_id(instance_id)
                << ", is_pipeline=" << is_pipeline;
     {
-        load_block_queue =
-                std::make_shared<LoadBlockQueue>(instance_id, label, txn_id, 
schema_version);
+        load_block_queue = std::make_shared<LoadBlockQueue>(
+                instance_id, label, txn_id, schema_version, 
_all_block_queues_bytes);
         std::unique_lock l(_lock);
         _load_block_queues.emplace(instance_id, load_block_queue);
         _need_plan_fragment = false;
@@ -398,6 +416,7 @@ GroupCommitMgr::GroupCommitMgr(ExecEnv* exec_env) : 
_exec_env(exec_env) {
                               .set_min_threads(1)
                               
.set_max_threads(config::group_commit_insert_threads)
                               .build(&_thread_pool));
+    _all_block_queues_bytes = std::make_shared<std::atomic_size_t>(0);
 }
 
 GroupCommitMgr::~GroupCommitMgr() {
@@ -536,7 +555,8 @@ Status GroupCommitMgr::get_first_block_load_queue(
         std::lock_guard wlock(_lock);
         if (_table_map.find(table_id) == _table_map.end()) {
             _table_map.emplace(table_id, std::make_shared<GroupCommitTable>(
-                                                 _exec_env, 
_thread_pool.get(), db_id, table_id));
+                                                 _exec_env, 
_thread_pool.get(), db_id, table_id,
+                                                 _all_block_queues_bytes));
         }
         group_commit_table = _table_map[table_id];
     }
diff --git a/be/src/runtime/group_commit_mgr.h 
b/be/src/runtime/group_commit_mgr.h
index aa8d05534ca..9bbbc5da619 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -19,6 +19,9 @@
 
 #include <gen_cpp/PaloInternalService_types.h>
 
+#include <atomic>
+#include <memory>
+
 #include "common/status.h"
 #include "io/fs/stream_load_pipe.h"
 #include "util/lock.h"
@@ -41,14 +44,16 @@ class StreamLoadPipe;
 class LoadBlockQueue {
 public:
     LoadBlockQueue(const UniqueId& load_instance_id, std::string& label, 
int64_t txn_id,
-                   int64_t schema_version)
+                   int64_t schema_version,
+                   std::shared_ptr<std::atomic_size_t> all_block_queues_bytes)
             : load_instance_id(load_instance_id),
               label(label),
               txn_id(txn_id),
               schema_version(schema_version),
-              _start_time(std::chrono::steady_clock::now()) {
+              _start_time(std::chrono::steady_clock::now()),
+              _all_block_queues_bytes(all_block_queues_bytes) {
         _mutex = std::make_shared<doris::Mutex>();
-        _cv = std::make_shared<doris::ConditionVariable>();
+        _single_block_queue_bytes = std::make_shared<std::atomic_size_t>(0);
     };
 
     Status add_block(std::shared_ptr<vectorized::FutureBlock> block);
@@ -57,6 +62,7 @@ public:
     void remove_load_id(const UniqueId& load_id);
     void cancel(const Status& st);
 
+    static constexpr size_t MAX_BLOCK_QUEUE_ADD_WAIT_TIME = 1000;
     UniqueId load_instance_id;
     std::string label;
     int64_t txn_id;
@@ -67,19 +73,28 @@ private:
     std::chrono::steady_clock::time_point _start_time;
 
     std::shared_ptr<doris::Mutex> _mutex;
-    std::shared_ptr<doris::ConditionVariable> _cv;
+    doris::ConditionVariable _put_cond;
+    doris::ConditionVariable _get_cond;
     // the set of load ids of all blocks in this queue
     std::set<UniqueId> _load_ids;
     std::list<std::shared_ptr<vectorized::FutureBlock>> _block_queue;
 
     Status _status = Status::OK();
+    // memory consumption of all tables' load block queues, used for back 
pressure.
+    std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
+    // memory consumption of one load block queue, used for correctness check.
+    std::shared_ptr<std::atomic_size_t> _single_block_queue_bytes;
 };
 
 class GroupCommitTable {
 public:
     GroupCommitTable(ExecEnv* exec_env, doris::ThreadPool* thread_pool, 
int64_t db_id,
-                     int64_t table_id)
-            : _exec_env(exec_env), _thread_pool(thread_pool), _db_id(db_id), 
_table_id(table_id) {};
+                     int64_t table_id, std::shared_ptr<std::atomic_size_t> 
all_block_queue_bytes)
+            : _exec_env(exec_env),
+              _thread_pool(thread_pool),
+              _db_id(db_id),
+              _table_id(table_id),
+              _all_block_queues_bytes(all_block_queue_bytes) {};
     Status get_first_block_load_queue(int64_t table_id,
                                       std::shared_ptr<vectorized::FutureBlock> 
block,
                                       std::shared_ptr<LoadBlockQueue>& 
load_block_queue);
@@ -105,6 +120,8 @@ private:
     // fragment_instance_id to load_block_queue
     std::unordered_map<UniqueId, std::shared_ptr<LoadBlockQueue>> 
_load_block_queues;
     bool _need_plan_fragment = false;
+    // memory consumption of all tables' load block queues, used for back 
pressure.
+    std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
 };
 
 class GroupCommitMgr {
@@ -142,6 +159,8 @@ private:
     // thread pool to handle insert into: append data to pipe
     std::unique_ptr<doris::ThreadPool> _insert_into_thread_pool;
     std::unique_ptr<doris::ThreadPool> _thread_pool;
+    // memory consumption of all tables' load block queues, used for back 
pressure.
+    std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
 };
 
 } // namespace doris
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to