This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c037302392b [fix](group commit) fix group commit can not get block 
queue and may stuck (#37260)
c037302392b is described below

commit c037302392b72ff1342191ad08afe4ceac5f357c
Author: meiyi <[email protected]>
AuthorDate: Sat Jul 6 16:43:07 2024 +0800

    [fix](group commit) fix group commit can not get block queue and may stuck 
(#37260)
    
    ## Proposed changes
    
    1. fix `can not get block queue` in low frequency
    2. fix the get_block may stuck
---
 .../exec/group_commit_block_sink_operator.cpp      |  11 +-
 be/src/runtime/group_commit_mgr.cpp                | 158 +++++++++++++--------
 be/src/runtime/group_commit_mgr.h                  |  14 +-
 3 files changed, 117 insertions(+), 66 deletions(-)

diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp 
b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
index 17088b37c3e..424ede07be5 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
@@ -27,7 +27,12 @@ namespace doris::pipeline {
 GroupCommitBlockSinkLocalState::~GroupCommitBlockSinkLocalState() {
     if (_load_block_queue) {
         _remove_estimated_wal_bytes();
-        
_load_block_queue->remove_load_id(_parent->cast<GroupCommitBlockSinkOperatorX>()._load_id);
+        [[maybe_unused]] auto st = _load_block_queue->remove_load_id(
+                _parent->cast<GroupCommitBlockSinkOperatorX>()._load_id);
+    } else {
+        _state->exec_env()->group_commit_mgr()->remove_load_id(
+                _parent->cast<GroupCommitBlockSinkOperatorX>()._table_id,
+                _parent->cast<GroupCommitBlockSinkOperatorX>()._load_id);
     }
 }
 
@@ -221,7 +226,7 @@ Status 
GroupCommitBlockSinkLocalState::_add_blocks(RuntimeState* state,
         if (dp->param<int64_t>("table_id", -1) == _table_id) {
             if (_load_block_queue) {
                 _remove_estimated_wal_bytes();
-                _load_block_queue->remove_load_id(p._load_id);
+                [[maybe_unused]] auto st = 
_load_block_queue->remove_load_id(p._load_id);
             }
             if 
(ExecEnv::GetInstance()->group_commit_mgr()->debug_future.wait_for(
                         std ::chrono ::seconds(60)) == std ::future_status 
::ready) {
@@ -304,7 +309,7 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* 
state, vectorized::Bloc
                 RETURN_IF_ERROR(local_state._add_blocks(state, true));
             }
             local_state._remove_estimated_wal_bytes();
-            local_state._load_block_queue->remove_load_id(_load_id);
+            [[maybe_unused]] auto st = 
local_state._load_block_queue->remove_load_id(_load_id);
         }
         return Status::OK();
     };
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 464f9f51221..54f25a708a4 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -112,58 +112,48 @@ Status LoadBlockQueue::get_block(RuntimeState* 
runtime_state, vectorized::Block*
     *find_block = false;
     *eos = false;
     std::unique_lock l(mutex);
-    if (!_need_commit) {
-        if 
(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
 -
-                                                                  _start_time)
-                    .count() >= _group_commit_interval_ms) {
-            _need_commit = true;
-        }
+    if (runtime_state->is_cancelled() || !status.ok()) {
+        auto st = runtime_state->cancel_reason();
+        _cancel_without_lock(st);
+        return status;
     }
     auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
                             std::chrono::steady_clock::now() - _start_time)
                             .count();
-    if (!runtime_state->is_cancelled() && status.ok() && _block_queue.empty() 
&& !_need_commit) {
-        if (_group_commit_interval_ms - duration <= 0) {
-            _need_commit = true;
-        } else {
-            get_block_dep->block();
-            return Status::OK();
+    if (!_need_commit && duration >= _group_commit_interval_ms) {
+        _need_commit = true;
+    }
+    auto get_load_ids = [&]() {
+        std::stringstream ss;
+        ss << "[";
+        for (auto& id : _load_ids_to_write_dep) {
+            ss << id.first.to_string() << ", ";
         }
-    } else if (!runtime_state->is_cancelled() && status.ok() && 
_block_queue.empty() &&
-               _need_commit && !_load_ids_to_write_dep.empty()) {
-        if (duration >= 10 * _group_commit_interval_ms) {
-            std::stringstream ss;
-            ss << "[";
-            for (auto& id : _load_ids_to_write_dep) {
-                ss << id.first.to_string() << ", ";
+        ss << "]";
+        return ss.str();
+    };
+    if (_block_queue.empty()) {
+        if (_need_commit && duration >= 10 * _group_commit_interval_ms) {
+            auto last_print_duration = 
std::chrono::duration_cast<std::chrono::milliseconds>(
+                                               
std::chrono::steady_clock::now() - _last_print_time)
+                                               .count();
+            if (last_print_duration >= 5000) {
+                _last_print_time = std::chrono::steady_clock::now();
+                LOG(INFO) << "find one group_commit need to commit, txn_id=" 
<< txn_id
+                          << ", label=" << label << ", instance_id=" << 
load_instance_id
+                          << ", duration=" << duration << ", load_ids=" << 
get_load_ids();
             }
-            ss << "]";
-            LOG(INFO) << "find one group_commit need to commit, txn_id=" << 
txn_id
-                      << ", label=" << label << ", instance_id=" << 
load_instance_id
-                      << ", duration=" << duration << ", load_ids=" << ss.str()
-                      << ", runtime_state=" << runtime_state;
         }
-        get_block_dep->block();
-        return Status::OK();
-    }
-    if (runtime_state->is_cancelled()) {
-        auto st = runtime_state->cancel_reason();
-        _cancel_without_lock(st);
-        return status;
-    }
-    if (!_block_queue.empty()) {
+        if (!_load_ids_to_write_dep.empty()) {
+            get_block_dep->block();
+        }
+    } else {
         const BlockData block_data = _block_queue.front();
         block->swap(*block_data.block);
         *find_block = true;
         _block_queue.pop_front();
         int before_block_queues_bytes = _all_block_queues_bytes->load();
         _all_block_queues_bytes->fetch_sub(block_data.block_bytes, 
std::memory_order_relaxed);
-        std::stringstream ss;
-        ss << "[";
-        for (const auto& id : _load_ids_to_write_dep) {
-            ss << id.first.to_string() << ", ";
-        }
-        ss << "]";
         VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::get_block). "
                    << "block queue size is " << _block_queue.size() << ", 
block rows is "
                    << block->rows() << ", block bytes is " << block->bytes()
@@ -172,9 +162,8 @@ Status LoadBlockQueue::get_block(RuntimeState* 
runtime_state, vectorized::Block*
                    << ", after remove block, all block queues bytes is "
                    << _all_block_queues_bytes->load() << ", txn_id=" << txn_id
                    << ", label=" << label << ", instance_id=" << 
load_instance_id
-                   << ", load_ids=" << ss.str() << ", runtime_state=" << 
runtime_state
-                   << ", the block is " << block->dump_data() << ", the block 
column size is "
-                   << block->columns_bytes();
+                   << ", load_ids=" << get_load_ids() << ", the block is " << 
block->dump_data()
+                   << ", the block column size is " << block->columns_bytes();
     }
     if (_block_queue.empty() && _need_commit && 
_load_ids_to_write_dep.empty()) {
         *eos = true;
@@ -190,7 +179,7 @@ Status LoadBlockQueue::get_block(RuntimeState* 
runtime_state, vectorized::Block*
     return Status::OK();
 }
 
-void LoadBlockQueue::remove_load_id(const UniqueId& load_id) {
+Status LoadBlockQueue::remove_load_id(const UniqueId& load_id) {
     std::unique_lock l(mutex);
     if (_load_ids_to_write_dep.find(load_id) != _load_ids_to_write_dep.end()) {
         _load_ids_to_write_dep[load_id]->set_always_ready();
@@ -198,7 +187,15 @@ void LoadBlockQueue::remove_load_id(const UniqueId& 
load_id) {
         for (auto read_dep : _read_deps) {
             read_dep->set_ready();
         }
+        return Status::OK();
     }
+    return Status::NotFound<false>("load_id=" + load_id.to_string() +
+                                   " not in block queue, label=" + label);
+}
+
+bool LoadBlockQueue::contain_load_id(const UniqueId& load_id) {
+    std::unique_lock l(mutex);
+    return _load_ids_to_write_dep.find(load_id) != 
_load_ids_to_write_dep.end();
 }
 
 Status LoadBlockQueue::add_load_id(const UniqueId& load_id,
@@ -250,6 +247,9 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st) 
{
     for (auto& id : _load_ids_to_write_dep) {
         id.second->set_always_ready();
     }
+    for (auto read_dep : _read_deps) {
+        read_dep->set_ready();
+    }
 }
 
 Status GroupCommitTable::get_first_block_load_queue(
@@ -261,6 +261,14 @@ Status GroupCommitTable::get_first_block_load_queue(
     DCHECK(table_id == _table_id);
     std::unique_lock l(_lock);
     auto try_to_get_matched_queue = [&]() -> Status {
+        for (const auto& [_, inner_block_queue] : _load_block_queues) {
+            if (inner_block_queue->contain_load_id(load_id)) {
+                load_block_queue = inner_block_queue;
+                label = inner_block_queue->label;
+                txn_id = inner_block_queue->txn_id;
+                return Status::OK();
+            }
+        }
         for (const auto& [_, inner_block_queue] : _load_block_queues) {
             if (!inner_block_queue->need_commit()) {
                 if (base_schema_version == inner_block_queue->schema_version) {
@@ -285,28 +293,38 @@ Status GroupCommitTable::get_first_block_load_queue(
         return Status::OK();
     }
     create_plan_dep->block();
-    _create_plan_deps.push_back(create_plan_dep);
+    _create_plan_deps.emplace(load_id,
+                              std::make_tuple(create_plan_dep, put_block_dep, 
base_schema_version));
     if (!_is_creating_plan_fragment) {
         _is_creating_plan_fragment = true;
-        RETURN_IF_ERROR(_thread_pool->submit_func([&, be_exe_version, 
mem_tracker,
-                                                   dep = create_plan_dep] {
-            Defer defer {[&, dep = dep]() {
-                std::unique_lock l(_lock);
-                for (auto it : _create_plan_deps) {
-                    it->set_ready();
-                }
-                std::vector<std::shared_ptr<pipeline::Dependency>> 
{}.swap(_create_plan_deps);
-                _is_creating_plan_fragment = false;
-            }};
-            auto st = _create_group_commit_load(be_exe_version, mem_tracker);
-            if (!st.ok()) {
-                LOG(WARNING) << "create group commit load error, st=" << 
st.to_string();
-            }
-        }));
+        RETURN_IF_ERROR(
+                _thread_pool->submit_func([&, be_exe_version, mem_tracker, dep 
= create_plan_dep] {
+                    Defer defer {[&, dep = dep]() {
+                        std::unique_lock l(_lock);
+                        for (auto it : _create_plan_deps) {
+                            std::get<0>(it.second)->set_ready();
+                        }
+                        _create_plan_deps.clear();
+                        _is_creating_plan_fragment = false;
+                    }};
+                    auto st = _create_group_commit_load(be_exe_version, 
mem_tracker);
+                    if (!st.ok()) {
+                        LOG(WARNING) << "create group commit load error, st=" 
<< st.to_string();
+                    }
+                }));
     }
     return try_to_get_matched_queue();
 }
 
+void GroupCommitTable::remove_load_id(const UniqueId& load_id) {
+    std::unique_lock l(_lock);
+    for (const auto& [_, inner_block_queue] : _load_block_queues) {
+        if (inner_block_queue->remove_load_id(load_id).ok()) {
+            return;
+        }
+    }
+}
+
 Status GroupCommitTable::_create_group_commit_load(int be_exe_version,
                                                    
std::shared_ptr<MemTrackerLimiter> mem_tracker) {
     Status st = Status::OK();
@@ -378,6 +396,21 @@ Status GroupCommitTable::_create_group_commit_load(int 
be_exe_version,
                     
pipeline_params.fragment.output_sink.olap_table_sink.schema.slot_descs,
                     be_exe_version));
             _load_block_queues.emplace(instance_id, load_block_queue);
+
+            std::vector<UniqueId> success_load_ids;
+            for (const auto& [id, load_info] : _create_plan_deps) {
+                auto create_dep = std::get<0>(load_info);
+                auto put_dep = std::get<1>(load_info);
+                if (load_block_queue->schema_version == 
std::get<2>(load_info)) {
+                    if (load_block_queue->add_load_id(id, put_dep).ok()) {
+                        create_dep->set_ready();
+                        success_load_ids.emplace_back(id);
+                    }
+                }
+            }
+            for (const auto& id2 : success_load_ids) {
+                _create_plan_deps.erase(id2);
+            }
         }
     }
     st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, 
result.pipeline_params);
@@ -596,6 +629,13 @@ Status GroupCommitMgr::get_load_block_queue(int64_t 
table_id, const TUniqueId& i
     return group_commit_table->get_load_block_queue(instance_id, 
load_block_queue, get_block_dep);
 }
 
+void GroupCommitMgr::remove_load_id(int64_t table_id, const UniqueId& load_id) 
{
+    std::lock_guard wlock(_lock);
+    if (_table_map.find(table_id) != _table_map.end()) {
+        _table_map.find(table_id)->second->remove_load_id(load_id);
+    }
+}
+
 Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id,
                                   const std::string& import_label, WalManager* 
wal_manager,
                                   std::vector<TSlotDescriptor>& slot_desc, int 
be_exe_version) {
diff --git a/be/src/runtime/group_commit_mgr.h 
b/be/src/runtime/group_commit_mgr.h
index e9ea152ea5c..16c7e0c24d3 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -66,6 +66,7 @@ public:
               
wait_internal_group_commit_finish(wait_internal_group_commit_finish),
               _group_commit_interval_ms(group_commit_interval_ms),
               _start_time(std::chrono::steady_clock::now()),
+              _last_print_time(_start_time),
               _group_commit_data_bytes(group_commit_data_bytes),
               _all_block_queues_bytes(all_block_queues_bytes) {};
 
@@ -73,9 +74,10 @@ public:
                      bool write_wal, UniqueId& load_id);
     Status get_block(RuntimeState* runtime_state, vectorized::Block* block, 
bool* find_block,
                      bool* eos, std::shared_ptr<pipeline::Dependency> 
get_block_dep);
+    bool contain_load_id(const UniqueId& load_id);
     Status add_load_id(const UniqueId& load_id,
                        const std::shared_ptr<pipeline::Dependency> 
put_block_dep);
-    void remove_load_id(const UniqueId& load_id);
+    Status remove_load_id(const UniqueId& load_id);
     void cancel(const Status& st);
     bool need_commit() { return _need_commit; }
 
@@ -133,6 +135,7 @@ private:
     // commit by time interval, can be changed by 'ALTER TABLE my_table SET 
("group_commit_interval_ms"="1000");'
     int64_t _group_commit_interval_ms;
     std::chrono::steady_clock::time_point _start_time;
+    std::chrono::steady_clock::time_point _last_print_time;
     // commit by data size
     int64_t _group_commit_data_bytes;
     int64_t _data_bytes = 0;
@@ -140,8 +143,6 @@ private:
     // memory back pressure, memory consumption of all tables' load block 
queues
     std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
     std::condition_variable _get_cond;
-    static constexpr size_t MEM_BACK_PRESSURE_WAIT_TIME = 1000;      // 1s
-    static constexpr size_t MEM_BACK_PRESSURE_WAIT_TIMEOUT = 120000; // 120s
 };
 
 class GroupCommitTable {
@@ -164,6 +165,7 @@ public:
     Status get_load_block_queue(const TUniqueId& instance_id,
                                 std::shared_ptr<LoadBlockQueue>& 
load_block_queue,
                                 std::shared_ptr<pipeline::Dependency> 
get_block_dep);
+    void remove_load_id(const UniqueId& load_id);
 
 private:
     Status _create_group_commit_load(int be_exe_version,
@@ -186,7 +188,10 @@ private:
     // fragment_instance_id to load_block_queue
     std::unordered_map<UniqueId, std::shared_ptr<LoadBlockQueue>> 
_load_block_queues;
     bool _is_creating_plan_fragment = false;
-    std::vector<std::shared_ptr<pipeline::Dependency>> _create_plan_deps;
+    // user_load_id -> <create_plan_dep, put_block_dep, base_schema_version>
+    std::unordered_map<UniqueId, 
std::tuple<std::shared_ptr<pipeline::Dependency>,
+                                            
std::shared_ptr<pipeline::Dependency>, int64_t>>
+            _create_plan_deps;
 };
 
 class GroupCommitMgr {
@@ -208,6 +213,7 @@ public:
                                       std::shared_ptr<pipeline::Dependency> 
create_plan_dep,
                                       std::shared_ptr<pipeline::Dependency> 
put_block_dep,
                                       std::string& label, int64_t& txn_id);
+    void remove_load_id(int64_t table_id, const UniqueId& load_id);
     std::promise<Status> debug_promise;
     std::future<Status> debug_future = debug_promise.get_future();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to