This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1c435c297c5 [Improvement](load) Do no block in group commit sink
(#36612)
1c435c297c5 is described below
commit 1c435c297c58a3447cb0497ef46471e4d874bb61
Author: Gabriel <[email protected]>
AuthorDate: Mon Jun 24 09:29:21 2024 +0800
[Improvement](load) Do no block in group commit sink (#36612)
Group commit sink operator will create an internal loading task before
starting. This is a blocking stop now to create task by RPC which is not
allowed on pipeline engine.
This PR makes this blocking step a dependency.
---
.../exec/group_commit_block_sink_operator.cpp | 80 ++++++++++++--------
.../exec/group_commit_block_sink_operator.h | 5 +-
be/src/runtime/group_commit_mgr.cpp | 85 +++++++++++-----------
be/src/runtime/group_commit_mgr.h | 22 +++---
4 files changed, 107 insertions(+), 85 deletions(-)
diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
index 5de2e667d4e..3953eb63c4d 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
@@ -52,9 +52,28 @@ Status GroupCommitBlockSinkLocalState::open(RuntimeState*
state) {
for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state,
_output_vexpr_ctxs[i]));
}
+ _write_dependency = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
+
"GroupCommitBlockSinkDependency", true);
+
+ WARN_IF_ERROR(_initialize_load_queue(), "");
return Status::OK();
}
+Status GroupCommitBlockSinkLocalState::_initialize_load_queue() {
+ auto& p = _parent->cast<GroupCommitBlockSinkOperatorX>();
+ TUniqueId load_id;
+ load_id.__set_hi(p._load_id.hi);
+ load_id.__set_lo(p._load_id.lo);
+ if (_state->exec_env()->wal_mgr()->is_running()) {
+
RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
+ p._db_id, p._table_id, p._base_schema_version, load_id,
_load_block_queue,
+ _state->be_exec_version(), _state->query_mem_tracker(),
_write_dependency));
+ return Status::OK();
+ } else {
+ return Status::InternalError("be is stopping");
+ }
+}
+
Status GroupCommitBlockSinkLocalState::close(RuntimeState* state, Status
close_status) {
if (_closed) {
return Status::OK();
@@ -79,8 +98,9 @@ Status GroupCommitBlockSinkLocalState::close(RuntimeState*
state, Status close_s
std::string GroupCommitBlockSinkLocalState::debug_string(int
indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}",
Base::debug_string(indentation_level));
- fmt::format_to(debug_string_buffer, ", _load_block_queue: ({})",
- _load_block_queue ? _load_block_queue->debug_string() :
"NULL");
+ fmt::format_to(debug_string_buffer, ", _load_block_queue: ({}),
_base_schema_version: {}",
+ _load_block_queue ? _load_block_queue->debug_string() :
"NULL",
+
_parent->cast<GroupCommitBlockSinkOperatorX>()._base_schema_version);
return fmt::to_string(debug_string_buffer);
}
@@ -164,37 +184,31 @@ Status
GroupCommitBlockSinkLocalState::_add_blocks(RuntimeState* state,
TUniqueId load_id;
load_id.__set_hi(p._load_id.hi);
load_id.__set_lo(p._load_id.lo);
- if (_load_block_queue == nullptr) {
- if (_state->exec_env()->wal_mgr()->is_running()) {
-
RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
- p._db_id, p._table_id, p._base_schema_version, load_id,
_load_block_queue,
- _state->be_exec_version(), _state->query_mem_tracker()));
- if (_group_commit_mode == TGroupCommitMode::ASYNC_MODE) {
- size_t estimated_wal_bytes =
-
_calculate_estimated_wal_bytes(is_blocks_contain_all_load_data);
- _group_commit_mode =
-
_load_block_queue->has_enough_wal_disk_space(estimated_wal_bytes)
- ? TGroupCommitMode::ASYNC_MODE
- : TGroupCommitMode::SYNC_MODE;
- if (_group_commit_mode == TGroupCommitMode::SYNC_MODE) {
- LOG(INFO) << "Load id=" << print_id(_state->query_id())
- << ", use group commit label=" <<
_load_block_queue->label
- << " will not write wal because wal disk space
usage reach max "
- "limit. Detail info: "
- <<
_state->exec_env()->wal_mgr()->get_wal_dirs_info_string();
- } else {
- _estimated_wal_bytes = estimated_wal_bytes;
- }
- }
- if (_load_block_queue->wait_internal_group_commit_finish ||
- _group_commit_mode == TGroupCommitMode::SYNC_MODE) {
- _load_block_queue->append_dependency(_finish_dependency);
+ if (_state->exec_env()->wal_mgr()->is_running()) {
+ if (_group_commit_mode == TGroupCommitMode::ASYNC_MODE) {
+ size_t estimated_wal_bytes =
+
_calculate_estimated_wal_bytes(is_blocks_contain_all_load_data);
+ _group_commit_mode =
_load_block_queue->has_enough_wal_disk_space(estimated_wal_bytes)
+ ? TGroupCommitMode::ASYNC_MODE
+ : TGroupCommitMode::SYNC_MODE;
+ if (_group_commit_mode == TGroupCommitMode::SYNC_MODE) {
+ LOG(INFO) << "Load id=" << print_id(_state->query_id())
+ << ", use group commit label=" <<
_load_block_queue->label
+ << " will not write wal because wal disk space usage
reach max "
+ "limit. Detail info: "
+ <<
_state->exec_env()->wal_mgr()->get_wal_dirs_info_string();
+ } else {
+ _estimated_wal_bytes = estimated_wal_bytes;
}
- _state->set_import_label(_load_block_queue->label);
- _state->set_wal_id(_load_block_queue->txn_id);
- } else {
- return Status::InternalError("be is stopping");
}
+ if (_load_block_queue->wait_internal_group_commit_finish ||
+ _group_commit_mode == TGroupCommitMode::SYNC_MODE) {
+ _load_block_queue->append_dependency(_finish_dependency);
+ }
+ _state->set_import_label(_load_block_queue->label);
+ _state->set_wal_id(_load_block_queue->txn_id);
+ } else {
+ return Status::InternalError("be is stopping");
}
for (auto it = _blocks.begin(); it != _blocks.end(); ++it) {
RETURN_IF_ERROR(_load_block_queue->add_block(
@@ -263,6 +277,10 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState*
state, vectorized::Bloc
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)input_block->rows());
SCOPED_CONSUME_MEM_TRACKER(local_state._mem_tracker.get());
+ if (!local_state._load_block_queue) {
+ RETURN_IF_ERROR(local_state._initialize_load_queue());
+ }
+ DCHECK(local_state._load_block_queue);
Status status = Status::OK();
auto wind_up = [&]() -> Status {
diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.h
b/be/src/pipeline/exec/group_commit_block_sink_operator.h
index f26e65b97da..27e344deca6 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.h
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.h
@@ -46,6 +46,7 @@ public:
Status close(RuntimeState* state, Status exec_status) override;
Dependency* finishdependency() override { return _finish_dependency.get();
}
+ std::vector<Dependency*> dependencies() const override { return
{_write_dependency.get()}; }
std::string debug_string(int indentation_level) const override;
private:
@@ -54,12 +55,13 @@ private:
Status _add_blocks(RuntimeState* state, bool
is_blocks_contain_all_load_data);
size_t _calculate_estimated_wal_bytes(bool
is_blocks_contain_all_load_data);
void _remove_estimated_wal_bytes();
+ Status _initialize_load_queue();
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
std::unique_ptr<vectorized::OlapTableBlockConvertor> _block_convertor;
- std::shared_ptr<LoadBlockQueue> _load_block_queue;
+ std::shared_ptr<LoadBlockQueue> _load_block_queue = nullptr;
// used to calculate if meet the max filter ratio
std::vector<std::shared_ptr<vectorized::Block>> _blocks;
bool _is_block_appended = false;
@@ -73,6 +75,7 @@ private:
Bitmap _filter_bitmap;
int64_t _table_id;
std::shared_ptr<Dependency> _finish_dependency;
+ std::shared_ptr<Dependency> _write_dependency = nullptr;
};
class GroupCommitBlockSinkOperatorX final
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index d21535d6351..d5e2651fd4d 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -250,46 +250,48 @@ void LoadBlockQueue::_cancel_without_lock(const Status&
st) {
Status GroupCommitTable::get_first_block_load_queue(
int64_t table_id, int64_t base_schema_version, const UniqueId& load_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
- std::shared_ptr<MemTrackerLimiter> mem_tracker) {
+ std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> dep) {
DCHECK(table_id == _table_id);
- {
- std::unique_lock l(_lock);
- for (int i = 0; i < 3; i++) {
- bool is_schema_version_match = true;
- for (const auto& [_, inner_block_queue] : _load_block_queues) {
- if (!inner_block_queue->need_commit()) {
- if (base_schema_version ==
inner_block_queue->schema_version) {
- if (inner_block_queue->add_load_id(load_id).ok()) {
- load_block_queue = inner_block_queue;
- return Status::OK();
- }
- } else if (base_schema_version <
inner_block_queue->schema_version) {
- is_schema_version_match = false;
+ std::unique_lock l(_lock);
+ auto try_to_get_matched_queue = [&]() -> Status {
+ for (const auto& [_, inner_block_queue] : _load_block_queues) {
+ if (!inner_block_queue->need_commit()) {
+ if (base_schema_version == inner_block_queue->schema_version) {
+ if (inner_block_queue->add_load_id(load_id).ok()) {
+ load_block_queue = inner_block_queue;
+ return Status::OK();
}
+ } else {
+ return Status::DataQualityError<false>(
+ "schema version not match, maybe a schema change
is in process. "
+ "Please "
+ "retry this load manually.");
}
}
- if (!is_schema_version_match) {
- return Status::DataQualityError<false>(
- "schema version not match, maybe a schema change is in
process. Please "
- "retry this load manually.");
- }
- if (!_is_creating_plan_fragment) {
- _is_creating_plan_fragment = true;
- RETURN_IF_ERROR(_thread_pool->submit_func([this,
be_exe_version, mem_tracker] {
- auto st = _create_group_commit_load(be_exe_version,
mem_tracker);
- if (!st.ok()) {
- LOG(WARNING) << "create group commit load error, st="
<< st.to_string();
- std::unique_lock l(_lock);
- _is_creating_plan_fragment = false;
- _cv.notify_all();
- }
- }));
- }
- _cv.wait_for(l, std::chrono::seconds(4));
}
+ return Status::InternalError<false>("can not get a block queue for
table_id: " +
+ std::to_string(_table_id));
+ };
+
+ if (try_to_get_matched_queue().ok()) {
+ return Status::OK();
}
- return Status::InternalError<false>("can not get a block queue for
table_id: " +
- std::to_string(_table_id));
+ if (!_is_creating_plan_fragment) {
+ _is_creating_plan_fragment = true;
+ dep->block();
+ RETURN_IF_ERROR(_thread_pool->submit_func([&, be_exe_version,
mem_tracker, dep = dep] {
+ Defer defer {[&, dep = dep]() {
+ dep->set_ready();
+ std::unique_lock l(_lock);
+ _is_creating_plan_fragment = false;
+ }};
+ auto st = _create_group_commit_load(be_exe_version, mem_tracker);
+ if (!st.ok()) {
+ LOG(WARNING) << "create group commit load error, st=" <<
st.to_string();
+ }
+ }));
+ }
+ return try_to_get_matched_queue();
}
Status GroupCommitTable::_create_group_commit_load(int be_exe_version,
@@ -378,8 +380,6 @@ Status GroupCommitTable::_create_group_commit_load(int
be_exe_version,
be_exe_version));
}
_load_block_queues.emplace(instance_id, load_block_queue);
- _is_creating_plan_fragment = false;
- _cv.notify_all();
}
}
st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline,
result.params,
@@ -565,12 +565,10 @@ void GroupCommitMgr::stop() {
LOG(INFO) << "GroupCommitMgr is stopped";
}
-Status GroupCommitMgr::get_first_block_load_queue(int64_t db_id, int64_t
table_id,
- int64_t base_schema_version,
- const UniqueId& load_id,
-
std::shared_ptr<LoadBlockQueue>& load_block_queue,
- int be_exe_version,
-
std::shared_ptr<MemTrackerLimiter> mem_tracker) {
+Status GroupCommitMgr::get_first_block_load_queue(
+ int64_t db_id, int64_t table_id, int64_t base_schema_version, const
UniqueId& load_id,
+ std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
+ std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> dep) {
std::shared_ptr<GroupCommitTable> group_commit_table;
{
std::lock_guard wlock(_lock);
@@ -582,7 +580,8 @@ Status GroupCommitMgr::get_first_block_load_queue(int64_t
db_id, int64_t table_i
group_commit_table = _table_map[table_id];
}
RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue(
- table_id, base_schema_version, load_id, load_block_queue,
be_exe_version, mem_tracker));
+ table_id, base_schema_version, load_id, load_block_queue,
be_exe_version, mem_tracker,
+ dep));
return Status::OK();
}
diff --git a/be/src/runtime/group_commit_mgr.h
b/be/src/runtime/group_commit_mgr.h
index 65f9f09670c..679da81f75f 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -87,13 +87,14 @@ public:
std::string debug_string() const {
fmt::memory_buffer debug_string_buffer;
- fmt::format_to(debug_string_buffer,
- "load_instance_id={}, label={}, txn_id={}, "
- "wait_internal_group_commit_finish={},
data_size_condition={}, "
- "group_commit_load_count={}, process_finish={}",
- load_instance_id.to_string(), label, txn_id,
- wait_internal_group_commit_finish, data_size_condition,
- group_commit_load_count, process_finish.load());
+ fmt::format_to(
+ debug_string_buffer,
+ "load_instance_id={}, label={}, txn_id={}, "
+ "wait_internal_group_commit_finish={}, data_size_condition={},
"
+ "group_commit_load_count={}, process_finish={},
_need_commit={}, schema_version={}",
+ load_instance_id.to_string(), label, txn_id,
wait_internal_group_commit_finish,
+ data_size_condition, group_commit_load_count,
process_finish.load(), _need_commit,
+ schema_version);
return fmt::to_string(debug_string_buffer);
}
@@ -154,7 +155,8 @@ public:
const UniqueId& load_id,
std::shared_ptr<LoadBlockQueue>&
load_block_queue,
int be_exe_version,
- std::shared_ptr<MemTrackerLimiter>
mem_tracker);
+ std::shared_ptr<MemTrackerLimiter>
mem_tracker,
+ std::shared_ptr<pipeline::Dependency>
dep);
Status get_load_block_queue(const TUniqueId& instance_id,
std::shared_ptr<LoadBlockQueue>&
load_block_queue);
@@ -178,7 +180,6 @@ private:
int64_t _table_id;
std::mutex _lock;
- std::condition_variable _cv;
// fragment_instance_id to load_block_queue
std::unordered_map<UniqueId, std::shared_ptr<LoadBlockQueue>>
_load_block_queues;
bool _is_creating_plan_fragment = false;
@@ -198,7 +199,8 @@ public:
const UniqueId& load_id,
std::shared_ptr<LoadBlockQueue>&
load_block_queue,
int be_exe_version,
- std::shared_ptr<MemTrackerLimiter>
mem_tracker);
+ std::shared_ptr<MemTrackerLimiter>
mem_tracker,
+ std::shared_ptr<pipeline::Dependency>
dep);
std::promise<Status> debug_promise;
std::future<Status> debug_future = debug_promise.get_future();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]