This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 212cd8fdae1 branch-3.0: [fix](group commit) reduce cpu cost for
group_commit get_block #49822 (#51079)
212cd8fdae1 is described below
commit 212cd8fdae19ceae41005c8f5d295c6b8d58e52d
Author: meiyi <[email protected]>
AuthorDate: Wed May 21 09:40:26 2025 +0800
branch-3.0: [fix](group commit) reduce cpu cost for group_commit get_block
#49822 (#51079)
pick https://github.com/apache/doris/pull/49822
---
be/src/pipeline/dependency.h | 4 ++--
.../pipeline/exec/group_commit_scan_operator.cpp | 22 +++++++++++++++++-----
be/src/pipeline/exec/group_commit_scan_operator.h | 6 ++++--
be/src/runtime/group_commit_mgr.cpp | 6 ++++--
be/src/runtime/group_commit_mgr.h | 4 +++-
5 files changed, 30 insertions(+), 12 deletions(-)
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index dd953292396..496f11f7877 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -207,7 +207,7 @@ struct RuntimeFilterTimerQueue;
class RuntimeFilterTimer {
public:
RuntimeFilterTimer(int64_t registration_time, int32_t wait_time_ms,
- std::shared_ptr<RuntimeFilterDependency> parent)
+ std::shared_ptr<Dependency> parent)
: _parent(std::move(parent)),
_registration_time(registration_time),
_wait_time_ms(wait_time_ms) {}
@@ -230,7 +230,7 @@ public:
private:
friend struct RuntimeFilterTimerQueue;
- std::shared_ptr<RuntimeFilterDependency> _parent = nullptr;
+ std::shared_ptr<Dependency> _parent = nullptr;
std::vector<std::shared_ptr<RuntimeFilterDependency>>
_local_runtime_filter_dependencies;
std::mutex _lock;
int64_t _registration_time;
diff --git a/be/src/pipeline/exec/group_commit_scan_operator.cpp
b/be/src/pipeline/exec/group_commit_scan_operator.cpp
index fbe7f3c6f22..a3da7f9c3a0 100644
--- a/be/src/pipeline/exec/group_commit_scan_operator.cpp
+++ b/be/src/pipeline/exec/group_commit_scan_operator.cpp
@@ -33,10 +33,9 @@ Status GroupCommitOperatorX::get_block(RuntimeState* state,
vectorized::Block* b
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
bool find_node = false;
- while (!find_node && !*eos) {
- RETURN_IF_ERROR(local_state.load_block_queue->get_block(state, block,
&find_node, eos,
-
local_state._get_block_dependency));
- }
+ RETURN_IF_ERROR(local_state.load_block_queue->get_block(state, block,
&find_node, eos,
+
local_state._get_block_dependency,
+
local_state._timer_dependency));
return Status::OK();
}
@@ -46,8 +45,21 @@ Status GroupCommitLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
auto& p = _parent->cast<GroupCommitOperatorX>();
_get_block_dependency = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
"GroupCommitGetBlockDependency", true);
- return state->exec_env()->group_commit_mgr()->get_load_block_queue(
+ _timer_dependency = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
+
"GroupCommitTimerDependency", true);
+ auto st = state->exec_env()->group_commit_mgr()->get_load_block_queue(
p._table_id, state->fragment_instance_id(), load_block_queue,
_get_block_dependency);
+ if (st.ok()) {
+ DCHECK(load_block_queue != nullptr);
+ _timer_dependency->block();
+ _runtime_filter_timer = std::make_shared<pipeline::RuntimeFilterTimer>(
+ MonotonicMillis(),
load_block_queue->get_group_commit_interval_ms(),
+ _timer_dependency);
+ std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> timers;
+ timers.push_back(_runtime_filter_timer);
+
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(std::move(timers));
+ }
+ return st;
}
Status GroupCommitLocalState::_process_conjuncts(RuntimeState* state) {
diff --git a/be/src/pipeline/exec/group_commit_scan_operator.h
b/be/src/pipeline/exec/group_commit_scan_operator.h
index 46f50f37724..bf627df64b0 100644
--- a/be/src/pipeline/exec/group_commit_scan_operator.h
+++ b/be/src/pipeline/exec/group_commit_scan_operator.h
@@ -36,9 +36,9 @@ public:
GroupCommitLocalState(RuntimeState* state, OperatorXBase* parent)
: ScanLocalState(state, parent) {}
Status init(RuntimeState* state, LocalStateInfo& info) override;
- std::shared_ptr<LoadBlockQueue> load_block_queue;
+ std::shared_ptr<LoadBlockQueue> load_block_queue = nullptr;
std::vector<Dependency*> dependencies() const override {
- return {_scan_dependency.get(), _get_block_dependency.get()};
+ return {_scan_dependency.get(), _get_block_dependency.get(),
_timer_dependency.get()};
}
private:
@@ -46,6 +46,8 @@ private:
Status _process_conjuncts(RuntimeState* state) override;
std::shared_ptr<Dependency> _get_block_dependency = nullptr;
+ std::shared_ptr<Dependency> _timer_dependency = nullptr;
+ std::shared_ptr<pipeline::RuntimeFilterTimer> _runtime_filter_timer =
nullptr;
};
class GroupCommitOperatorX final : public ScanOperatorX<GroupCommitLocalState>
{
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index 6c8604bb51f..f49d6708bcc 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -107,7 +107,8 @@ Status LoadBlockQueue::add_block(RuntimeState*
runtime_state,
Status LoadBlockQueue::get_block(RuntimeState* runtime_state,
vectorized::Block* block,
bool* find_block, bool* eos,
- std::shared_ptr<pipeline::Dependency>
get_block_dep) {
+ std::shared_ptr<pipeline::Dependency>
get_block_dep,
+ std::shared_ptr<pipeline::Dependency>
timer_dependency) {
*find_block = false;
*eos = false;
std::unique_lock l(mutex);
@@ -143,8 +144,9 @@ Status LoadBlockQueue::get_block(RuntimeState*
runtime_state, vectorized::Block*
<< ", duration=" << duration << ", load_ids=" <<
get_load_ids();
}
}
- if (!_load_ids_to_write_dep.empty()) {
+ if (!_need_commit && !timer_dependency->ready()) {
get_block_dep->block();
+ VLOG_DEBUG << "block get_block for query_id=" << load_instance_id;
}
} else {
const BlockData block_data = _block_queue.front();
diff --git a/be/src/runtime/group_commit_mgr.h
b/be/src/runtime/group_commit_mgr.h
index c6cb34a022a..32579547893 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -73,7 +73,8 @@ public:
Status add_block(RuntimeState* runtime_state,
std::shared_ptr<vectorized::Block> block,
bool write_wal, UniqueId& load_id);
Status get_block(RuntimeState* runtime_state, vectorized::Block* block,
bool* find_block,
- bool* eos, std::shared_ptr<pipeline::Dependency>
get_block_dep);
+ bool* eos, std::shared_ptr<pipeline::Dependency>
get_block_dep,
+ std::shared_ptr<pipeline::Dependency> timer_dependency);
bool contain_load_id(const UniqueId& load_id);
Status add_load_id(const UniqueId& load_id,
const std::shared_ptr<pipeline::Dependency>
put_block_dep);
@@ -88,6 +89,7 @@ public:
bool has_enough_wal_disk_space(size_t estimated_wal_bytes);
void append_dependency(std::shared_ptr<pipeline::Dependency> finish_dep);
void append_read_dependency(std::shared_ptr<pipeline::Dependency>
read_dep);
+ int64_t get_group_commit_interval_ms() { return _group_commit_interval_ms;
};
std::string debug_string() const {
fmt::memory_buffer debug_string_buffer;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]