This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5e25544fe9a [Improvement](load) Do no block in group commit scan
operator (#36730)
5e25544fe9a is described below
commit 5e25544fe9ac2b7165e97637a8bc446ca1f69ef2
Author: Gabriel <[email protected]>
AuthorDate: Tue Jun 25 10:18:52 2024 +0800
[Improvement](load) Do no block in group commit scan operator (#36730)
---
.../pipeline/exec/group_commit_scan_operator.cpp | 7 ++-
be/src/pipeline/exec/group_commit_scan_operator.h | 6 ++
be/src/runtime/group_commit_mgr.cpp | 73 +++++++++++++---------
be/src/runtime/group_commit_mgr.h | 10 ++-
4 files changed, 60 insertions(+), 36 deletions(-)
diff --git a/be/src/pipeline/exec/group_commit_scan_operator.cpp
b/be/src/pipeline/exec/group_commit_scan_operator.cpp
index 5c3f7e84ee8..3e6ad62c5dc 100644
--- a/be/src/pipeline/exec/group_commit_scan_operator.cpp
+++ b/be/src/pipeline/exec/group_commit_scan_operator.cpp
@@ -33,7 +33,8 @@ Status GroupCommitOperatorX::get_block(RuntimeState* state,
vectorized::Block* b
auto& local_state = get_local_state(state);
bool find_node = false;
while (!find_node && !*eos) {
- RETURN_IF_ERROR(local_state.load_block_queue->get_block(state, block,
&find_node, eos));
+ RETURN_IF_ERROR(local_state.load_block_queue->get_block(state, block,
&find_node, eos,
+
local_state._get_block_dependency));
}
return Status::OK();
}
@@ -42,8 +43,10 @@ Status GroupCommitLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
RETURN_IF_ERROR(ScanLocalState<GroupCommitLocalState>::init(state, info));
SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<GroupCommitOperatorX>();
+ _get_block_dependency = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
+
"GroupCommitGetBlockDependency", true);
return state->exec_env()->group_commit_mgr()->get_load_block_queue(
- p._table_id, state->fragment_instance_id(), load_block_queue);
+ p._table_id, state->fragment_instance_id(), load_block_queue,
_get_block_dependency);
}
Status GroupCommitLocalState::_process_conjuncts(RuntimeState* state) {
diff --git a/be/src/pipeline/exec/group_commit_scan_operator.h
b/be/src/pipeline/exec/group_commit_scan_operator.h
index b4767d60543..46f50f37724 100644
--- a/be/src/pipeline/exec/group_commit_scan_operator.h
+++ b/be/src/pipeline/exec/group_commit_scan_operator.h
@@ -37,9 +37,15 @@ public:
: ScanLocalState(state, parent) {}
Status init(RuntimeState* state, LocalStateInfo& info) override;
std::shared_ptr<LoadBlockQueue> load_block_queue;
+ std::vector<Dependency*> dependencies() const override {
+ return {_scan_dependency.get(), _get_block_dependency.get()};
+ }
private:
+ friend class GroupCommitOperatorX;
Status _process_conjuncts(RuntimeState* state) override;
+
+ std::shared_ptr<Dependency> _get_block_dependency = nullptr;
};
class GroupCommitOperatorX final : public ScanOperatorX<GroupCommitLocalState>
{
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index 7a17fd88939..ab11b795ed5 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -100,12 +100,15 @@ Status LoadBlockQueue::add_block(RuntimeState*
runtime_state,
_need_commit = true;
}
}
- _get_cond.notify_all();
+ for (auto read_dep : _read_deps) {
+ read_dep->set_ready();
+ }
return Status::OK();
}
Status LoadBlockQueue::get_block(RuntimeState* runtime_state,
vectorized::Block* block,
- bool* find_block, bool* eos) {
+ bool* find_block, bool* eos,
+ std::shared_ptr<pipeline::Dependency>
get_block_dep) {
*find_block = false;
*eos = false;
std::unique_lock l(mutex);
@@ -116,34 +119,32 @@ Status LoadBlockQueue::get_block(RuntimeState*
runtime_state, vectorized::Block*
_need_commit = true;
}
}
- while (!runtime_state->is_cancelled() && status.ok() &&
_block_queue.empty() &&
- (!_need_commit || (_need_commit &&
!_load_ids_to_write_dep.empty()))) {
- auto left_milliseconds = _group_commit_interval_ms;
- auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now() - _start_time)
- .count();
- if (!_need_commit) {
- left_milliseconds = _group_commit_interval_ms - duration;
- if (left_milliseconds <= 0) {
- _need_commit = true;
- break;
- }
+ auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now() - _start_time)
+ .count();
+ if (!runtime_state->is_cancelled() && status.ok() && _block_queue.empty()
&& !_need_commit) {
+ if (_group_commit_interval_ms - duration <= 0) {
+ _need_commit = true;
} else {
- if (duration >= 10 * _group_commit_interval_ms) {
- std::stringstream ss;
- ss << "[";
- for (auto& id : _load_ids_to_write_dep) {
- ss << id.first.to_string() << ", ";
- }
- ss << "]";
- LOG(INFO) << "find one group_commit need to commit, txn_id="
<< txn_id
- << ", label=" << label << ", instance_id=" <<
load_instance_id
- << ", duration=" << duration << ", load_ids=" <<
ss.str()
- << ", runtime_state=" << runtime_state;
+ get_block_dep->block();
+ return Status::OK();
+ }
+ } else if (!runtime_state->is_cancelled() && status.ok() &&
_block_queue.empty() &&
+ _need_commit && !_load_ids_to_write_dep.empty()) {
+ if (duration >= 10 * _group_commit_interval_ms) {
+ std::stringstream ss;
+ ss << "[";
+ for (auto& id : _load_ids_to_write_dep) {
+ ss << id.first.to_string() << ", ";
}
+ ss << "]";
+ LOG(INFO) << "find one group_commit need to commit, txn_id=" <<
txn_id
+ << ", label=" << label << ", instance_id=" <<
load_instance_id
+ << ", duration=" << duration << ", load_ids=" << ss.str()
+ << ", runtime_state=" << runtime_state;
}
- _get_cond.wait_for(l, std::chrono::milliseconds(
- std::min(left_milliseconds,
static_cast<int64_t>(10000))));
+ get_block_dep->block();
+ return Status::OK();
}
if (runtime_state->is_cancelled()) {
auto st = runtime_state->cancel_reason();
@@ -194,7 +195,9 @@ void LoadBlockQueue::remove_load_id(const UniqueId&
load_id) {
if (_load_ids_to_write_dep.find(load_id) != _load_ids_to_write_dep.end()) {
_load_ids_to_write_dep[load_id]->set_always_ready();
_load_ids_to_write_dep.erase(load_id);
- _get_cond.notify_all();
+ for (auto read_dep : _read_deps) {
+ read_dep->set_ready();
+ }
}
}
@@ -543,7 +546,8 @@ Status GroupCommitTable::_exec_plan_fragment(int64_t db_id,
int64_t table_id,
}
Status GroupCommitTable::get_load_block_queue(const TUniqueId& instance_id,
- std::shared_ptr<LoadBlockQueue>&
load_block_queue) {
+ std::shared_ptr<LoadBlockQueue>&
load_block_queue,
+
std::shared_ptr<pipeline::Dependency> get_block_dep) {
std::unique_lock l(_lock);
auto it = _load_block_queues.find(instance_id);
if (it == _load_block_queues.end()) {
@@ -551,6 +555,7 @@ Status GroupCommitTable::get_load_block_queue(const
TUniqueId& instance_id,
" not found");
}
load_block_queue = it->second;
+ load_block_queue->append_read_dependency(get_block_dep);
return Status::OK();
}
@@ -594,7 +599,8 @@ Status GroupCommitMgr::get_first_block_load_queue(
}
Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId&
instance_id,
- std::shared_ptr<LoadBlockQueue>&
load_block_queue) {
+ std::shared_ptr<LoadBlockQueue>&
load_block_queue,
+
std::shared_ptr<pipeline::Dependency> get_block_dep) {
std::shared_ptr<GroupCommitTable> group_commit_table;
{
std::lock_guard<std::mutex> l(_lock);
@@ -605,7 +611,7 @@ Status GroupCommitMgr::get_load_block_queue(int64_t
table_id, const TUniqueId& i
}
group_commit_table = it->second;
}
- return group_commit_table->get_load_block_queue(instance_id,
load_block_queue);
+ return group_commit_table->get_load_block_queue(instance_id,
load_block_queue, get_block_dep);
}
Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id,
@@ -637,6 +643,11 @@ void
LoadBlockQueue::append_dependency(std::shared_ptr<pipeline::Dependency> fin
}
}
+void
LoadBlockQueue::append_read_dependency(std::shared_ptr<pipeline::Dependency>
read_dep) {
+ std::lock_guard<std::mutex> lock(mutex);
+ _read_deps.push_back(read_dep);
+}
+
bool LoadBlockQueue::has_enough_wal_disk_space(size_t estimated_wal_bytes) {
DBUG_EXECUTE_IF("LoadBlockQueue.has_enough_wal_disk_space.low_space", {
return false; });
auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr();
diff --git a/be/src/runtime/group_commit_mgr.h
b/be/src/runtime/group_commit_mgr.h
index f290d2aa6bb..702ebb9c746 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -72,7 +72,7 @@ public:
Status add_block(RuntimeState* runtime_state,
std::shared_ptr<vectorized::Block> block,
bool write_wal, UniqueId& load_id);
Status get_block(RuntimeState* runtime_state, vectorized::Block* block,
bool* find_block,
- bool* eos);
+ bool* eos, std::shared_ptr<pipeline::Dependency>
get_block_dep);
Status add_load_id(const UniqueId& load_id,
const std::shared_ptr<pipeline::Dependency>
put_block_dep);
void remove_load_id(const UniqueId& load_id);
@@ -85,6 +85,7 @@ public:
Status close_wal();
bool has_enough_wal_disk_space(size_t estimated_wal_bytes);
void append_dependency(std::shared_ptr<pipeline::Dependency> finish_dep);
+ void append_read_dependency(std::shared_ptr<pipeline::Dependency>
read_dep);
std::string debug_string() const {
fmt::memory_buffer debug_string_buffer;
@@ -120,6 +121,7 @@ private:
// the set of load ids of all blocks in this queue
std::map<UniqueId, std::shared_ptr<pipeline::Dependency>>
_load_ids_to_write_dep;
+ std::vector<std::shared_ptr<pipeline::Dependency>> _read_deps;
std::list<BlockData> _block_queue;
// wal
@@ -159,7 +161,8 @@ public:
std::shared_ptr<pipeline::Dependency>
create_plan_dep,
std::shared_ptr<pipeline::Dependency>
put_block_dep);
Status get_load_block_queue(const TUniqueId& instance_id,
- std::shared_ptr<LoadBlockQueue>&
load_block_queue);
+ std::shared_ptr<LoadBlockQueue>&
load_block_queue,
+ std::shared_ptr<pipeline::Dependency>
get_block_dep);
private:
Status _create_group_commit_load(int be_exe_version,
@@ -195,7 +198,8 @@ public:
// used when init group_commit_scan_node
Status get_load_block_queue(int64_t table_id, const TUniqueId& instance_id,
- std::shared_ptr<LoadBlockQueue>&
load_block_queue);
+ std::shared_ptr<LoadBlockQueue>&
load_block_queue,
+ std::shared_ptr<pipeline::Dependency>
get_block_dep);
Status get_first_block_load_queue(int64_t db_id, int64_t table_id, int64_t
base_schema_version,
const UniqueId& load_id,
std::shared_ptr<LoadBlockQueue>&
load_block_queue,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]