This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 08dedc8e3bf [Improvement](load) Do no block in group commit sink
(#36717)
08dedc8e3bf is described below
commit 08dedc8e3bf17920cbf5d52ee64b943c58f2dd85
Author: Gabriel <[email protected]>
AuthorDate: Mon Jun 24 12:19:07 2024 +0800
[Improvement](load) Do no block in group commit sink (#36717)
Do not rely on a conditional variable in group commit sink
---
.../exec/group_commit_block_sink_operator.cpp | 23 ++---
.../exec/group_commit_block_sink_operator.h | 7 +-
be/src/runtime/group_commit_mgr.cpp | 102 +++++++++++----------
be/src/runtime/group_commit_mgr.h | 14 +--
4 files changed, 78 insertions(+), 68 deletions(-)
diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
index 3953eb63c4d..402354d6f24 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
@@ -52,22 +52,21 @@ Status GroupCommitBlockSinkLocalState::open(RuntimeState*
state) {
for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state,
_output_vexpr_ctxs[i]));
}
- _write_dependency = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
-
"GroupCommitBlockSinkDependency", true);
-
+ _create_plan_dependency =
Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
+
"CreateGroupCommitPlanDependency", true);
+ _put_block_dependency = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
+
"GroupCommitPutBlockDependency", true);
WARN_IF_ERROR(_initialize_load_queue(), "");
return Status::OK();
}
Status GroupCommitBlockSinkLocalState::_initialize_load_queue() {
auto& p = _parent->cast<GroupCommitBlockSinkOperatorX>();
- TUniqueId load_id;
- load_id.__set_hi(p._load_id.hi);
- load_id.__set_lo(p._load_id.lo);
if (_state->exec_env()->wal_mgr()->is_running()) {
RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
- p._db_id, p._table_id, p._base_schema_version, load_id,
_load_block_queue,
- _state->be_exec_version(), _state->query_mem_tracker(),
_write_dependency));
+ p._db_id, p._table_id, p._base_schema_version, p._load_id,
_load_block_queue,
+ _state->be_exec_version(), _state->query_mem_tracker(),
_create_plan_dependency,
+ _put_block_dependency));
return Status::OK();
} else {
return Status::InternalError("be is stopping");
@@ -138,7 +137,8 @@ Status
GroupCommitBlockSinkLocalState::_add_block(RuntimeState* state,
RETURN_IF_ERROR(_add_blocks(state, false));
}
RETURN_IF_ERROR(_load_block_queue->add_block(
- state, output_block, _group_commit_mode ==
TGroupCommitMode::ASYNC_MODE));
+ state, output_block, _group_commit_mode ==
TGroupCommitMode::ASYNC_MODE,
+ _parent->cast<GroupCommitBlockSinkOperatorX>()._load_id));
}
return Status::OK();
}
@@ -181,9 +181,6 @@ Status
GroupCommitBlockSinkLocalState::_add_blocks(RuntimeState* state,
bool
is_blocks_contain_all_load_data) {
DCHECK(_is_block_appended == false);
auto& p = _parent->cast<GroupCommitBlockSinkOperatorX>();
- TUniqueId load_id;
- load_id.__set_hi(p._load_id.hi);
- load_id.__set_lo(p._load_id.lo);
if (_state->exec_env()->wal_mgr()->is_running()) {
if (_group_commit_mode == TGroupCommitMode::ASYNC_MODE) {
size_t estimated_wal_bytes =
@@ -212,7 +209,7 @@ Status
GroupCommitBlockSinkLocalState::_add_blocks(RuntimeState* state,
}
for (auto it = _blocks.begin(); it != _blocks.end(); ++it) {
RETURN_IF_ERROR(_load_block_queue->add_block(
- state, *it, _group_commit_mode ==
TGroupCommitMode::ASYNC_MODE));
+ state, *it, _group_commit_mode ==
TGroupCommitMode::ASYNC_MODE, p._load_id));
}
_is_block_appended = true;
_blocks.clear();
diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.h
b/be/src/pipeline/exec/group_commit_block_sink_operator.h
index 27e344deca6..caf7017d050 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.h
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.h
@@ -46,7 +46,9 @@ public:
Status close(RuntimeState* state, Status exec_status) override;
Dependency* finishdependency() override { return _finish_dependency.get();
}
- std::vector<Dependency*> dependencies() const override { return
{_write_dependency.get()}; }
+ std::vector<Dependency*> dependencies() const override {
+ return {_create_plan_dependency.get(), _put_block_dependency.get()};
+ }
std::string debug_string(int indentation_level) const override;
private:
@@ -75,7 +77,8 @@ private:
Bitmap _filter_bitmap;
int64_t _table_id;
std::shared_ptr<Dependency> _finish_dependency;
- std::shared_ptr<Dependency> _write_dependency = nullptr;
+ std::shared_ptr<Dependency> _create_plan_dependency = nullptr;
+ std::shared_ptr<Dependency> _put_block_dependency = nullptr;
};
class GroupCommitBlockSinkOperatorX final
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index d5e2651fd4d..7a17fd88939 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -35,28 +35,14 @@
namespace doris {
Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
- std::shared_ptr<vectorized::Block> block,
bool write_wal) {
+ std::shared_ptr<vectorized::Block> block,
bool write_wal,
+ UniqueId& load_id) {
std::unique_lock l(mutex);
RETURN_IF_ERROR(status);
auto start = std::chrono::steady_clock::now();
DBUG_EXECUTE_IF("LoadBlockQueue.add_block.back_pressure_time_out", {
start = std::chrono::steady_clock::now() -
std::chrono::milliseconds(120000);
});
- while (!runtime_state->is_cancelled() && status.ok() &&
- _all_block_queues_bytes->load(std::memory_order_relaxed) >=
- config::group_commit_queue_mem_limit) {
- _put_cond.wait_for(l,
-
std::chrono::milliseconds(LoadBlockQueue::MEM_BACK_PRESSURE_WAIT_TIME));
- auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::steady_clock::now() - start);
- if (duration.count() > LoadBlockQueue::MEM_BACK_PRESSURE_WAIT_TIMEOUT)
{
- return Status::TimedOut<false>(
- "Wal memory back pressure wait too much time! Load block
queue txn id: {}, "
- "label: {}, instance id: {}, consumed memory: {}",
- txn_id, label, load_instance_id.to_string(),
- _all_block_queues_bytes->load(std::memory_order_relaxed));
- }
- }
if (UNLIKELY(runtime_state->is_cancelled())) {
return runtime_state->cancel_reason();
}
@@ -69,8 +55,8 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
_all_block_queues_bytes->fetch_add(block->bytes(),
std::memory_order_relaxed);
std::stringstream ss;
ss << "[";
- for (const auto& id : _load_ids) {
- ss << id.to_string() << ", ";
+ for (const auto& id : _load_ids_to_write_dep) {
+ ss << id.first.to_string() << ", ";
}
ss << "]";
VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::add_block). "
@@ -92,6 +78,12 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
return st;
}
}
+ if (!runtime_state->is_cancelled() && status.ok() &&
+ _all_block_queues_bytes->load(std::memory_order_relaxed) >=
+ config::group_commit_queue_mem_limit) {
+ DCHECK(_load_ids_to_write_dep.find(load_id) !=
_load_ids_to_write_dep.end());
+ _load_ids_to_write_dep[load_id]->block();
+ }
}
if (!_need_commit) {
if (_data_bytes >= _group_commit_data_bytes) {
@@ -125,7 +117,7 @@ Status LoadBlockQueue::get_block(RuntimeState*
runtime_state, vectorized::Block*
}
}
while (!runtime_state->is_cancelled() && status.ok() &&
_block_queue.empty() &&
- (!_need_commit || (_need_commit && !_load_ids.empty()))) {
+ (!_need_commit || (_need_commit &&
!_load_ids_to_write_dep.empty()))) {
auto left_milliseconds = _group_commit_interval_ms;
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _start_time)
@@ -140,8 +132,8 @@ Status LoadBlockQueue::get_block(RuntimeState*
runtime_state, vectorized::Block*
if (duration >= 10 * _group_commit_interval_ms) {
std::stringstream ss;
ss << "[";
- for (auto& id : _load_ids) {
- ss << id.to_string() << ", ";
+ for (auto& id : _load_ids_to_write_dep) {
+ ss << id.first.to_string() << ", ";
}
ss << "]";
LOG(INFO) << "find one group_commit need to commit, txn_id="
<< txn_id
@@ -167,8 +159,8 @@ Status LoadBlockQueue::get_block(RuntimeState*
runtime_state, vectorized::Block*
_all_block_queues_bytes->fetch_sub(block_data.block_bytes,
std::memory_order_relaxed);
std::stringstream ss;
ss << "[";
- for (const auto& id : _load_ids) {
- ss << id.to_string() << ", ";
+ for (const auto& id : _load_ids_to_write_dep) {
+ ss << id.first.to_string() << ", ";
}
ss << "]";
VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::get_block). "
@@ -183,30 +175,37 @@ Status LoadBlockQueue::get_block(RuntimeState*
runtime_state, vectorized::Block*
<< ", the block is " << block->dump_data() << ", the block
column size is "
<< block->columns_bytes();
}
- if (_block_queue.empty() && _need_commit && _load_ids.empty()) {
+ if (_block_queue.empty() && _need_commit &&
_load_ids_to_write_dep.empty()) {
*eos = true;
} else {
*eos = false;
}
- _put_cond.notify_all();
+ if (_all_block_queues_bytes->load(std::memory_order_relaxed) <
+ config::group_commit_queue_mem_limit) {
+ for (auto& id : _load_ids_to_write_dep) {
+ id.second->set_ready();
+ }
+ }
return Status::OK();
}
void LoadBlockQueue::remove_load_id(const UniqueId& load_id) {
std::unique_lock l(mutex);
- if (_load_ids.find(load_id) != _load_ids.end()) {
- _load_ids.erase(load_id);
+ if (_load_ids_to_write_dep.find(load_id) != _load_ids_to_write_dep.end()) {
+ _load_ids_to_write_dep[load_id]->set_always_ready();
+ _load_ids_to_write_dep.erase(load_id);
_get_cond.notify_all();
}
}
-Status LoadBlockQueue::add_load_id(const UniqueId& load_id) {
+Status LoadBlockQueue::add_load_id(const UniqueId& load_id,
+ const std::shared_ptr<pipeline::Dependency>
put_block_dep) {
std::unique_lock l(mutex);
if (_need_commit) {
return Status::InternalError<false>("block queue is set need commit,
id=" +
load_instance_id.to_string());
}
- _load_ids.emplace(load_id);
+ _load_ids_to_write_dep[load_id] = put_block_dep;
group_commit_load_count.fetch_add(1);
return Status::OK();
}
@@ -228,8 +227,8 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st)
{
_all_block_queues_bytes->fetch_sub(block_data.block_bytes,
std::memory_order_relaxed);
std::stringstream ss;
ss << "[";
- for (const auto& id : _load_ids) {
- ss << id.to_string() << ", ";
+ for (const auto& id : _load_ids_to_write_dep) {
+ ss << id.first.to_string() << ", ";
}
ss << "]";
VLOG_DEBUG << "[Group Commit Debug]
(LoadBlockQueue::_cancel_without_block). "
@@ -245,20 +244,26 @@ void LoadBlockQueue::_cancel_without_lock(const Status&
st) {
<< block_data.block->columns_bytes();
_block_queue.pop_front();
}
+ for (auto& id : _load_ids_to_write_dep) {
+ id.second->set_always_ready();
+ }
}
Status GroupCommitTable::get_first_block_load_queue(
int64_t table_id, int64_t base_schema_version, const UniqueId& load_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
- std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> dep) {
+ std::shared_ptr<MemTrackerLimiter> mem_tracker,
+ std::shared_ptr<pipeline::Dependency> create_plan_dep,
+ std::shared_ptr<pipeline::Dependency> put_block_dep) {
DCHECK(table_id == _table_id);
std::unique_lock l(_lock);
auto try_to_get_matched_queue = [&]() -> Status {
for (const auto& [_, inner_block_queue] : _load_block_queues) {
if (!inner_block_queue->need_commit()) {
if (base_schema_version == inner_block_queue->schema_version) {
- if (inner_block_queue->add_load_id(load_id).ok()) {
+ if (inner_block_queue->add_load_id(load_id,
put_block_dep).ok()) {
load_block_queue = inner_block_queue;
+
return Status::OK();
}
} else {
@@ -278,18 +283,19 @@ Status GroupCommitTable::get_first_block_load_queue(
}
if (!_is_creating_plan_fragment) {
_is_creating_plan_fragment = true;
- dep->block();
- RETURN_IF_ERROR(_thread_pool->submit_func([&, be_exe_version,
mem_tracker, dep = dep] {
- Defer defer {[&, dep = dep]() {
- dep->set_ready();
- std::unique_lock l(_lock);
- _is_creating_plan_fragment = false;
- }};
- auto st = _create_group_commit_load(be_exe_version, mem_tracker);
- if (!st.ok()) {
- LOG(WARNING) << "create group commit load error, st=" <<
st.to_string();
- }
- }));
+ create_plan_dep->block();
+ RETURN_IF_ERROR(
+ _thread_pool->submit_func([&, be_exe_version, mem_tracker, dep
= create_plan_dep] {
+ Defer defer {[&, dep = dep]() {
+ dep->set_ready();
+ std::unique_lock l(_lock);
+ _is_creating_plan_fragment = false;
+ }};
+ auto st = _create_group_commit_load(be_exe_version,
mem_tracker);
+ if (!st.ok()) {
+ LOG(WARNING) << "create group commit load error, st="
<< st.to_string();
+ }
+ }));
}
return try_to_get_matched_queue();
}
@@ -568,7 +574,9 @@ void GroupCommitMgr::stop() {
Status GroupCommitMgr::get_first_block_load_queue(
int64_t db_id, int64_t table_id, int64_t base_schema_version, const
UniqueId& load_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
- std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> dep) {
+ std::shared_ptr<MemTrackerLimiter> mem_tracker,
+ std::shared_ptr<pipeline::Dependency> create_plan_dep,
+ std::shared_ptr<pipeline::Dependency> put_block_dep) {
std::shared_ptr<GroupCommitTable> group_commit_table;
{
std::lock_guard wlock(_lock);
@@ -581,7 +589,7 @@ Status GroupCommitMgr::get_first_block_load_queue(
}
RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue(
table_id, base_schema_version, load_id, load_block_queue,
be_exe_version, mem_tracker,
- dep));
+ create_plan_dep, put_block_dep));
return Status::OK();
}
diff --git a/be/src/runtime/group_commit_mgr.h
b/be/src/runtime/group_commit_mgr.h
index 679da81f75f..f290d2aa6bb 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -70,10 +70,11 @@ public:
_all_block_queues_bytes(all_block_queues_bytes) {};
Status add_block(RuntimeState* runtime_state,
std::shared_ptr<vectorized::Block> block,
- bool write_wal);
+ bool write_wal, UniqueId& load_id);
Status get_block(RuntimeState* runtime_state, vectorized::Block* block,
bool* find_block,
bool* eos);
- Status add_load_id(const UniqueId& load_id);
+ Status add_load_id(const UniqueId& load_id,
+ const std::shared_ptr<pipeline::Dependency>
put_block_dep);
void remove_load_id(const UniqueId& load_id);
void cancel(const Status& st);
bool need_commit() { return _need_commit; }
@@ -118,7 +119,7 @@ private:
void _cancel_without_lock(const Status& st);
// the set of load ids of all blocks in this queue
- std::set<UniqueId> _load_ids;
+ std::map<UniqueId, std::shared_ptr<pipeline::Dependency>>
_load_ids_to_write_dep;
std::list<BlockData> _block_queue;
// wal
@@ -136,7 +137,6 @@ private:
// memory back pressure, memory consumption of all tables' load block
queues
std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
- std::condition_variable _put_cond;
std::condition_variable _get_cond;
static constexpr size_t MEM_BACK_PRESSURE_WAIT_TIME = 1000; // 1s
static constexpr size_t MEM_BACK_PRESSURE_WAIT_TIMEOUT = 120000; // 120s
@@ -156,7 +156,8 @@ public:
std::shared_ptr<LoadBlockQueue>&
load_block_queue,
int be_exe_version,
std::shared_ptr<MemTrackerLimiter>
mem_tracker,
- std::shared_ptr<pipeline::Dependency>
dep);
+ std::shared_ptr<pipeline::Dependency>
create_plan_dep,
+ std::shared_ptr<pipeline::Dependency>
put_block_dep);
Status get_load_block_queue(const TUniqueId& instance_id,
std::shared_ptr<LoadBlockQueue>&
load_block_queue);
@@ -200,7 +201,8 @@ public:
std::shared_ptr<LoadBlockQueue>&
load_block_queue,
int be_exe_version,
std::shared_ptr<MemTrackerLimiter>
mem_tracker,
- std::shared_ptr<pipeline::Dependency>
dep);
+ std::shared_ptr<pipeline::Dependency>
create_plan_dep,
+ std::shared_ptr<pipeline::Dependency>
put_block_dep);
std::promise<Status> debug_promise;
std::future<Status> debug_future = debug_promise.get_future();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]