This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c037302392b [fix](group commit) fix group commit can not get block
queue and may stuck (#37260)
c037302392b is described below
commit c037302392b72ff1342191ad08afe4ceac5f357c
Author: meiyi <[email protected]>
AuthorDate: Sat Jul 6 16:43:07 2024 +0800
[fix](group commit) fix group commit can not get block queue and may stuck
(#37260)
## Proposed changes
1. fix `can not get block queue` in low frequency
2. fix the get_block may stuck
---
.../exec/group_commit_block_sink_operator.cpp | 11 +-
be/src/runtime/group_commit_mgr.cpp | 158 +++++++++++++--------
be/src/runtime/group_commit_mgr.h | 14 +-
3 files changed, 117 insertions(+), 66 deletions(-)
diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
index 17088b37c3e..424ede07be5 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
@@ -27,7 +27,12 @@ namespace doris::pipeline {
GroupCommitBlockSinkLocalState::~GroupCommitBlockSinkLocalState() {
if (_load_block_queue) {
_remove_estimated_wal_bytes();
-
_load_block_queue->remove_load_id(_parent->cast<GroupCommitBlockSinkOperatorX>()._load_id);
+ [[maybe_unused]] auto st = _load_block_queue->remove_load_id(
+ _parent->cast<GroupCommitBlockSinkOperatorX>()._load_id);
+ } else {
+ _state->exec_env()->group_commit_mgr()->remove_load_id(
+ _parent->cast<GroupCommitBlockSinkOperatorX>()._table_id,
+ _parent->cast<GroupCommitBlockSinkOperatorX>()._load_id);
}
}
@@ -221,7 +226,7 @@ Status
GroupCommitBlockSinkLocalState::_add_blocks(RuntimeState* state,
if (dp->param<int64_t>("table_id", -1) == _table_id) {
if (_load_block_queue) {
_remove_estimated_wal_bytes();
- _load_block_queue->remove_load_id(p._load_id);
+ [[maybe_unused]] auto st =
_load_block_queue->remove_load_id(p._load_id);
}
if
(ExecEnv::GetInstance()->group_commit_mgr()->debug_future.wait_for(
std ::chrono ::seconds(60)) == std ::future_status
::ready) {
@@ -304,7 +309,7 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState*
state, vectorized::Bloc
RETURN_IF_ERROR(local_state._add_blocks(state, true));
}
local_state._remove_estimated_wal_bytes();
- local_state._load_block_queue->remove_load_id(_load_id);
+ [[maybe_unused]] auto st =
local_state._load_block_queue->remove_load_id(_load_id);
}
return Status::OK();
};
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index 464f9f51221..54f25a708a4 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -112,58 +112,48 @@ Status LoadBlockQueue::get_block(RuntimeState*
runtime_state, vectorized::Block*
*find_block = false;
*eos = false;
std::unique_lock l(mutex);
- if (!_need_commit) {
- if
(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
-
- _start_time)
- .count() >= _group_commit_interval_ms) {
- _need_commit = true;
- }
+ if (runtime_state->is_cancelled() || !status.ok()) {
+ auto st = runtime_state->cancel_reason();
+ _cancel_without_lock(st);
+ return status;
}
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _start_time)
.count();
- if (!runtime_state->is_cancelled() && status.ok() && _block_queue.empty()
&& !_need_commit) {
- if (_group_commit_interval_ms - duration <= 0) {
- _need_commit = true;
- } else {
- get_block_dep->block();
- return Status::OK();
+ if (!_need_commit && duration >= _group_commit_interval_ms) {
+ _need_commit = true;
+ }
+ auto get_load_ids = [&]() {
+ std::stringstream ss;
+ ss << "[";
+ for (auto& id : _load_ids_to_write_dep) {
+ ss << id.first.to_string() << ", ";
}
- } else if (!runtime_state->is_cancelled() && status.ok() &&
_block_queue.empty() &&
- _need_commit && !_load_ids_to_write_dep.empty()) {
- if (duration >= 10 * _group_commit_interval_ms) {
- std::stringstream ss;
- ss << "[";
- for (auto& id : _load_ids_to_write_dep) {
- ss << id.first.to_string() << ", ";
+ ss << "]";
+ return ss.str();
+ };
+ if (_block_queue.empty()) {
+ if (_need_commit && duration >= 10 * _group_commit_interval_ms) {
+ auto last_print_duration =
std::chrono::duration_cast<std::chrono::milliseconds>(
+
std::chrono::steady_clock::now() - _last_print_time)
+ .count();
+ if (last_print_duration >= 5000) {
+ _last_print_time = std::chrono::steady_clock::now();
+ LOG(INFO) << "find one group_commit need to commit, txn_id="
<< txn_id
+ << ", label=" << label << ", instance_id=" <<
load_instance_id
+ << ", duration=" << duration << ", load_ids=" <<
get_load_ids();
}
- ss << "]";
- LOG(INFO) << "find one group_commit need to commit, txn_id=" <<
txn_id
- << ", label=" << label << ", instance_id=" <<
load_instance_id
- << ", duration=" << duration << ", load_ids=" << ss.str()
- << ", runtime_state=" << runtime_state;
}
- get_block_dep->block();
- return Status::OK();
- }
- if (runtime_state->is_cancelled()) {
- auto st = runtime_state->cancel_reason();
- _cancel_without_lock(st);
- return status;
- }
- if (!_block_queue.empty()) {
+ if (!_load_ids_to_write_dep.empty()) {
+ get_block_dep->block();
+ }
+ } else {
const BlockData block_data = _block_queue.front();
block->swap(*block_data.block);
*find_block = true;
_block_queue.pop_front();
int before_block_queues_bytes = _all_block_queues_bytes->load();
_all_block_queues_bytes->fetch_sub(block_data.block_bytes,
std::memory_order_relaxed);
- std::stringstream ss;
- ss << "[";
- for (const auto& id : _load_ids_to_write_dep) {
- ss << id.first.to_string() << ", ";
- }
- ss << "]";
VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::get_block). "
<< "block queue size is " << _block_queue.size() << ",
block rows is "
<< block->rows() << ", block bytes is " << block->bytes()
@@ -172,9 +162,8 @@ Status LoadBlockQueue::get_block(RuntimeState*
runtime_state, vectorized::Block*
<< ", after remove block, all block queues bytes is "
<< _all_block_queues_bytes->load() << ", txn_id=" << txn_id
<< ", label=" << label << ", instance_id=" <<
load_instance_id
- << ", load_ids=" << ss.str() << ", runtime_state=" <<
runtime_state
- << ", the block is " << block->dump_data() << ", the block
column size is "
- << block->columns_bytes();
+ << ", load_ids=" << get_load_ids() << ", the block is " <<
block->dump_data()
+ << ", the block column size is " << block->columns_bytes();
}
if (_block_queue.empty() && _need_commit &&
_load_ids_to_write_dep.empty()) {
*eos = true;
@@ -190,7 +179,7 @@ Status LoadBlockQueue::get_block(RuntimeState*
runtime_state, vectorized::Block*
return Status::OK();
}
-void LoadBlockQueue::remove_load_id(const UniqueId& load_id) {
+Status LoadBlockQueue::remove_load_id(const UniqueId& load_id) {
std::unique_lock l(mutex);
if (_load_ids_to_write_dep.find(load_id) != _load_ids_to_write_dep.end()) {
_load_ids_to_write_dep[load_id]->set_always_ready();
@@ -198,7 +187,15 @@ void LoadBlockQueue::remove_load_id(const UniqueId&
load_id) {
for (auto read_dep : _read_deps) {
read_dep->set_ready();
}
+ return Status::OK();
}
+ return Status::NotFound<false>("load_id=" + load_id.to_string() +
+ " not in block queue, label=" + label);
+}
+
+bool LoadBlockQueue::contain_load_id(const UniqueId& load_id) {
+ std::unique_lock l(mutex);
+ return _load_ids_to_write_dep.find(load_id) !=
_load_ids_to_write_dep.end();
}
Status LoadBlockQueue::add_load_id(const UniqueId& load_id,
@@ -250,6 +247,9 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st)
{
for (auto& id : _load_ids_to_write_dep) {
id.second->set_always_ready();
}
+ for (auto read_dep : _read_deps) {
+ read_dep->set_ready();
+ }
}
Status GroupCommitTable::get_first_block_load_queue(
@@ -261,6 +261,14 @@ Status GroupCommitTable::get_first_block_load_queue(
DCHECK(table_id == _table_id);
std::unique_lock l(_lock);
auto try_to_get_matched_queue = [&]() -> Status {
+ for (const auto& [_, inner_block_queue] : _load_block_queues) {
+ if (inner_block_queue->contain_load_id(load_id)) {
+ load_block_queue = inner_block_queue;
+ label = inner_block_queue->label;
+ txn_id = inner_block_queue->txn_id;
+ return Status::OK();
+ }
+ }
for (const auto& [_, inner_block_queue] : _load_block_queues) {
if (!inner_block_queue->need_commit()) {
if (base_schema_version == inner_block_queue->schema_version) {
@@ -285,28 +293,38 @@ Status GroupCommitTable::get_first_block_load_queue(
return Status::OK();
}
create_plan_dep->block();
- _create_plan_deps.push_back(create_plan_dep);
+ _create_plan_deps.emplace(load_id,
+ std::make_tuple(create_plan_dep, put_block_dep,
base_schema_version));
if (!_is_creating_plan_fragment) {
_is_creating_plan_fragment = true;
- RETURN_IF_ERROR(_thread_pool->submit_func([&, be_exe_version,
mem_tracker,
- dep = create_plan_dep] {
- Defer defer {[&, dep = dep]() {
- std::unique_lock l(_lock);
- for (auto it : _create_plan_deps) {
- it->set_ready();
- }
- std::vector<std::shared_ptr<pipeline::Dependency>>
{}.swap(_create_plan_deps);
- _is_creating_plan_fragment = false;
- }};
- auto st = _create_group_commit_load(be_exe_version, mem_tracker);
- if (!st.ok()) {
- LOG(WARNING) << "create group commit load error, st=" <<
st.to_string();
- }
- }));
+ RETURN_IF_ERROR(
+ _thread_pool->submit_func([&, be_exe_version, mem_tracker, dep
= create_plan_dep] {
+ Defer defer {[&, dep = dep]() {
+ std::unique_lock l(_lock);
+ for (auto it : _create_plan_deps) {
+ std::get<0>(it.second)->set_ready();
+ }
+ _create_plan_deps.clear();
+ _is_creating_plan_fragment = false;
+ }};
+ auto st = _create_group_commit_load(be_exe_version,
mem_tracker);
+ if (!st.ok()) {
+ LOG(WARNING) << "create group commit load error, st="
<< st.to_string();
+ }
+ }));
}
return try_to_get_matched_queue();
}
+void GroupCommitTable::remove_load_id(const UniqueId& load_id) {
+ std::unique_lock l(_lock);
+ for (const auto& [_, inner_block_queue] : _load_block_queues) {
+ if (inner_block_queue->remove_load_id(load_id).ok()) {
+ return;
+ }
+ }
+}
+
Status GroupCommitTable::_create_group_commit_load(int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker) {
Status st = Status::OK();
@@ -378,6 +396,21 @@ Status GroupCommitTable::_create_group_commit_load(int
be_exe_version,
pipeline_params.fragment.output_sink.olap_table_sink.schema.slot_descs,
be_exe_version));
_load_block_queues.emplace(instance_id, load_block_queue);
+
+ std::vector<UniqueId> success_load_ids;
+ for (const auto& [id, load_info] : _create_plan_deps) {
+ auto create_dep = std::get<0>(load_info);
+ auto put_dep = std::get<1>(load_info);
+ if (load_block_queue->schema_version ==
std::get<2>(load_info)) {
+ if (load_block_queue->add_load_id(id, put_dep).ok()) {
+ create_dep->set_ready();
+ success_load_ids.emplace_back(id);
+ }
+ }
+ }
+ for (const auto& id2 : success_load_ids) {
+ _create_plan_deps.erase(id2);
+ }
}
}
st = _exec_plan_fragment(_db_id, _table_id, label, txn_id,
result.pipeline_params);
@@ -596,6 +629,13 @@ Status GroupCommitMgr::get_load_block_queue(int64_t
table_id, const TUniqueId& i
return group_commit_table->get_load_block_queue(instance_id,
load_block_queue, get_block_dep);
}
+void GroupCommitMgr::remove_load_id(int64_t table_id, const UniqueId& load_id)
{
+ std::lock_guard wlock(_lock);
+ if (_table_map.find(table_id) != _table_map.end()) {
+ _table_map.find(table_id)->second->remove_load_id(load_id);
+ }
+}
+
Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id,
const std::string& import_label, WalManager*
wal_manager,
std::vector<TSlotDescriptor>& slot_desc, int
be_exe_version) {
diff --git a/be/src/runtime/group_commit_mgr.h
b/be/src/runtime/group_commit_mgr.h
index e9ea152ea5c..16c7e0c24d3 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -66,6 +66,7 @@ public:
wait_internal_group_commit_finish(wait_internal_group_commit_finish),
_group_commit_interval_ms(group_commit_interval_ms),
_start_time(std::chrono::steady_clock::now()),
+ _last_print_time(_start_time),
_group_commit_data_bytes(group_commit_data_bytes),
_all_block_queues_bytes(all_block_queues_bytes) {};
@@ -73,9 +74,10 @@ public:
bool write_wal, UniqueId& load_id);
Status get_block(RuntimeState* runtime_state, vectorized::Block* block,
bool* find_block,
bool* eos, std::shared_ptr<pipeline::Dependency>
get_block_dep);
+ bool contain_load_id(const UniqueId& load_id);
Status add_load_id(const UniqueId& load_id,
const std::shared_ptr<pipeline::Dependency>
put_block_dep);
- void remove_load_id(const UniqueId& load_id);
+ Status remove_load_id(const UniqueId& load_id);
void cancel(const Status& st);
bool need_commit() { return _need_commit; }
@@ -133,6 +135,7 @@ private:
// commit by time interval, can be changed by 'ALTER TABLE my_table SET
("group_commit_interval_ms"="1000");'
int64_t _group_commit_interval_ms;
std::chrono::steady_clock::time_point _start_time;
+ std::chrono::steady_clock::time_point _last_print_time;
// commit by data size
int64_t _group_commit_data_bytes;
int64_t _data_bytes = 0;
@@ -140,8 +143,6 @@ private:
// memory back pressure, memory consumption of all tables' load block
queues
std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
std::condition_variable _get_cond;
- static constexpr size_t MEM_BACK_PRESSURE_WAIT_TIME = 1000; // 1s
- static constexpr size_t MEM_BACK_PRESSURE_WAIT_TIMEOUT = 120000; // 120s
};
class GroupCommitTable {
@@ -164,6 +165,7 @@ public:
Status get_load_block_queue(const TUniqueId& instance_id,
std::shared_ptr<LoadBlockQueue>&
load_block_queue,
std::shared_ptr<pipeline::Dependency>
get_block_dep);
+ void remove_load_id(const UniqueId& load_id);
private:
Status _create_group_commit_load(int be_exe_version,
@@ -186,7 +188,10 @@ private:
// fragment_instance_id to load_block_queue
std::unordered_map<UniqueId, std::shared_ptr<LoadBlockQueue>>
_load_block_queues;
bool _is_creating_plan_fragment = false;
- std::vector<std::shared_ptr<pipeline::Dependency>> _create_plan_deps;
+ // user_load_id -> <create_plan_dep, put_block_dep, base_schema_version>
+ std::unordered_map<UniqueId,
std::tuple<std::shared_ptr<pipeline::Dependency>,
+
std::shared_ptr<pipeline::Dependency>, int64_t>>
+ _create_plan_deps;
};
class GroupCommitMgr {
@@ -208,6 +213,7 @@ public:
std::shared_ptr<pipeline::Dependency>
create_plan_dep,
std::shared_ptr<pipeline::Dependency>
put_block_dep,
std::string& label, int64_t& txn_id);
+ void remove_load_id(int64_t table_id, const UniqueId& load_id);
std::promise<Status> debug_promise;
std::future<Status> debug_future = debug_promise.get_future();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]