This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 212cd8fdae1 branch-3.0: [fix](group commit) reduce cpu cost for 
group_commit get_block #49822 (#51079)
212cd8fdae1 is described below

commit 212cd8fdae19ceae41005c8f5d295c6b8d58e52d
Author: meiyi <[email protected]>
AuthorDate: Wed May 21 09:40:26 2025 +0800

    branch-3.0: [fix](group commit) reduce cpu cost for group_commit get_block 
#49822 (#51079)
    
    pick https://github.com/apache/doris/pull/49822
---
 be/src/pipeline/dependency.h                       |  4 ++--
 .../pipeline/exec/group_commit_scan_operator.cpp   | 22 +++++++++++++++++-----
 be/src/pipeline/exec/group_commit_scan_operator.h  |  6 ++++--
 be/src/runtime/group_commit_mgr.cpp                |  6 ++++--
 be/src/runtime/group_commit_mgr.h                  |  4 +++-
 5 files changed, 30 insertions(+), 12 deletions(-)

diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index dd953292396..496f11f7877 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -207,7 +207,7 @@ struct RuntimeFilterTimerQueue;
 class RuntimeFilterTimer {
 public:
     RuntimeFilterTimer(int64_t registration_time, int32_t wait_time_ms,
-                       std::shared_ptr<RuntimeFilterDependency> parent)
+                       std::shared_ptr<Dependency> parent)
             : _parent(std::move(parent)),
               _registration_time(registration_time),
               _wait_time_ms(wait_time_ms) {}
@@ -230,7 +230,7 @@ public:
 
 private:
     friend struct RuntimeFilterTimerQueue;
-    std::shared_ptr<RuntimeFilterDependency> _parent = nullptr;
+    std::shared_ptr<Dependency> _parent = nullptr;
     std::vector<std::shared_ptr<RuntimeFilterDependency>> 
_local_runtime_filter_dependencies;
     std::mutex _lock;
     int64_t _registration_time;
diff --git a/be/src/pipeline/exec/group_commit_scan_operator.cpp 
b/be/src/pipeline/exec/group_commit_scan_operator.cpp
index fbe7f3c6f22..a3da7f9c3a0 100644
--- a/be/src/pipeline/exec/group_commit_scan_operator.cpp
+++ b/be/src/pipeline/exec/group_commit_scan_operator.cpp
@@ -33,10 +33,9 @@ Status GroupCommitOperatorX::get_block(RuntimeState* state, 
vectorized::Block* b
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     bool find_node = false;
-    while (!find_node && !*eos) {
-        RETURN_IF_ERROR(local_state.load_block_queue->get_block(state, block, 
&find_node, eos,
-                                                                
local_state._get_block_dependency));
-    }
+    RETURN_IF_ERROR(local_state.load_block_queue->get_block(state, block, 
&find_node, eos,
+                                                            
local_state._get_block_dependency,
+                                                            
local_state._timer_dependency));
     return Status::OK();
 }
 
@@ -46,8 +45,21 @@ Status GroupCommitLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     auto& p = _parent->cast<GroupCommitOperatorX>();
     _get_block_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
                                                       
"GroupCommitGetBlockDependency", true);
-    return state->exec_env()->group_commit_mgr()->get_load_block_queue(
+    _timer_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
+                                                  
"GroupCommitTimerDependency", true);
+    auto st = state->exec_env()->group_commit_mgr()->get_load_block_queue(
             p._table_id, state->fragment_instance_id(), load_block_queue, 
_get_block_dependency);
+    if (st.ok()) {
+        DCHECK(load_block_queue != nullptr);
+        _timer_dependency->block();
+        _runtime_filter_timer = std::make_shared<pipeline::RuntimeFilterTimer>(
+                MonotonicMillis(), 
load_block_queue->get_group_commit_interval_ms(),
+                _timer_dependency);
+        std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> timers;
+        timers.push_back(_runtime_filter_timer);
+        
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(std::move(timers));
+    }
+    return st;
 }
 
 Status GroupCommitLocalState::_process_conjuncts(RuntimeState* state) {
diff --git a/be/src/pipeline/exec/group_commit_scan_operator.h 
b/be/src/pipeline/exec/group_commit_scan_operator.h
index 46f50f37724..bf627df64b0 100644
--- a/be/src/pipeline/exec/group_commit_scan_operator.h
+++ b/be/src/pipeline/exec/group_commit_scan_operator.h
@@ -36,9 +36,9 @@ public:
     GroupCommitLocalState(RuntimeState* state, OperatorXBase* parent)
             : ScanLocalState(state, parent) {}
     Status init(RuntimeState* state, LocalStateInfo& info) override;
-    std::shared_ptr<LoadBlockQueue> load_block_queue;
+    std::shared_ptr<LoadBlockQueue> load_block_queue = nullptr;
     std::vector<Dependency*> dependencies() const override {
-        return {_scan_dependency.get(), _get_block_dependency.get()};
+        return {_scan_dependency.get(), _get_block_dependency.get(), 
_timer_dependency.get()};
     }
 
 private:
@@ -46,6 +46,8 @@ private:
     Status _process_conjuncts(RuntimeState* state) override;
 
     std::shared_ptr<Dependency> _get_block_dependency = nullptr;
+    std::shared_ptr<Dependency> _timer_dependency = nullptr;
+    std::shared_ptr<pipeline::RuntimeFilterTimer> _runtime_filter_timer = 
nullptr;
 };
 
 class GroupCommitOperatorX final : public ScanOperatorX<GroupCommitLocalState> 
{
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 6c8604bb51f..f49d6708bcc 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -107,7 +107,8 @@ Status LoadBlockQueue::add_block(RuntimeState* 
runtime_state,
 
 Status LoadBlockQueue::get_block(RuntimeState* runtime_state, 
vectorized::Block* block,
                                  bool* find_block, bool* eos,
-                                 std::shared_ptr<pipeline::Dependency> 
get_block_dep) {
+                                 std::shared_ptr<pipeline::Dependency> 
get_block_dep,
+                                 std::shared_ptr<pipeline::Dependency> 
timer_dependency) {
     *find_block = false;
     *eos = false;
     std::unique_lock l(mutex);
@@ -143,8 +144,9 @@ Status LoadBlockQueue::get_block(RuntimeState* 
runtime_state, vectorized::Block*
                           << ", duration=" << duration << ", load_ids=" << 
get_load_ids();
             }
         }
-        if (!_load_ids_to_write_dep.empty()) {
+        if (!_need_commit && !timer_dependency->ready()) {
             get_block_dep->block();
+            VLOG_DEBUG << "block get_block for query_id=" << load_instance_id;
         }
     } else {
         const BlockData block_data = _block_queue.front();
diff --git a/be/src/runtime/group_commit_mgr.h 
b/be/src/runtime/group_commit_mgr.h
index c6cb34a022a..32579547893 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -73,7 +73,8 @@ public:
     Status add_block(RuntimeState* runtime_state, 
std::shared_ptr<vectorized::Block> block,
                      bool write_wal, UniqueId& load_id);
     Status get_block(RuntimeState* runtime_state, vectorized::Block* block, 
bool* find_block,
-                     bool* eos, std::shared_ptr<pipeline::Dependency> 
get_block_dep);
+                     bool* eos, std::shared_ptr<pipeline::Dependency> 
get_block_dep,
+                     std::shared_ptr<pipeline::Dependency> timer_dependency);
     bool contain_load_id(const UniqueId& load_id);
     Status add_load_id(const UniqueId& load_id,
                        const std::shared_ptr<pipeline::Dependency> 
put_block_dep);
@@ -88,6 +89,7 @@ public:
     bool has_enough_wal_disk_space(size_t estimated_wal_bytes);
     void append_dependency(std::shared_ptr<pipeline::Dependency> finish_dep);
     void append_read_dependency(std::shared_ptr<pipeline::Dependency> 
read_dep);
+    int64_t get_group_commit_interval_ms() { return _group_commit_interval_ms; 
};
 
     std::string debug_string() const {
         fmt::memory_buffer debug_string_buffer;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to