This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new cec69f4cbd3 [Enhancement](wal) Add timout for wal memory back pressure
(#29178)
cec69f4cbd3 is described below
commit cec69f4cbd3be1d4ce67faa820e3b4190b670a3c
Author: abmdocrt <[email protected]>
AuthorDate: Tue Jan 2 11:02:17 2024 +0800
[Enhancement](wal) Add timout for wal memory back pressure (#29178)
---
be/src/common/config.cpp | 2 +-
be/src/common/config.h | 2 +-
be/src/runtime/group_commit_mgr.cpp | 22 +++++++++++++++++++---
be/src/runtime/group_commit_mgr.h | 6 +++++-
be/src/vec/sink/group_commit_block_sink.cpp | 11 ++++++-----
be/src/vec/sink/group_commit_block_sink.h | 2 +-
6 files changed, 33 insertions(+), 12 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index e94b4c6dadb..9b30ae145c4 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1121,7 +1121,7 @@ DEFINE_Int32(group_commit_insert_threads, "10");
DEFINE_Int32(group_commit_memory_rows_for_max_filter_ratio, "10000");
DEFINE_Bool(wait_internal_group_commit_finish, "false");
// Max size(bytes) of group commit queues, used for mem back pressure, defult
64M.
-DEFINE_Int32(group_commit_max_queue_size, "67108864");
+DEFINE_Int32(group_commit_queue_mem_limit, "67108864");
// Max size(bytes) or percentage(%) of wal disk usage, used for disk space
back pressure, default 10% of the disk available space.
// group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10%
can be automatically identified.
DEFINE_String(group_commit_wal_max_disk_limit, "10%");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 2d257b49510..c340abe05f3 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1187,7 +1187,7 @@ DECLARE_mInt32(group_commit_insert_threads);
DECLARE_mInt32(group_commit_memory_rows_for_max_filter_ratio);
DECLARE_Bool(wait_internal_group_commit_finish);
// Max size(bytes) of group commit queues, used for mem back pressure.
-DECLARE_Int32(group_commit_max_queue_size);
+DECLARE_Int32(group_commit_queue_mem_limit);
// Max size(bytes) or percentage(%) of wal disk usage, used for disk space
back pressure, default 10% of the disk available space.
// group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10%
can be automatically identified.
DECLARE_mString(group_commit_wal_max_disk_limit);
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index 00b3a559fbf..b965efadd70 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -21,6 +21,7 @@
#include <glog/logging.h>
#include <atomic>
+#include <chrono>
#include <cstddef>
#include <cstdint>
#include <memory>
@@ -40,14 +41,29 @@
namespace doris {
-Status LoadBlockQueue::add_block(std::shared_ptr<vectorized::Block> block,
bool write_wal) {
+Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
+ std::shared_ptr<vectorized::Block> block,
bool write_wal) {
std::unique_lock l(mutex);
RETURN_IF_ERROR(status);
- while (_all_block_queues_bytes->load(std::memory_order_relaxed) >
- config::group_commit_max_queue_size) {
+ auto start = std::chrono::steady_clock::now();
+ while (!runtime_state->is_cancelled() && status.ok() &&
+ _all_block_queues_bytes->load(std::memory_order_relaxed) >
+ config::group_commit_queue_mem_limit) {
_put_cond.wait_for(
l,
std::chrono::milliseconds(LoadBlockQueue::MAX_BLOCK_QUEUE_ADD_WAIT_TIME));
+ auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now() - start);
+ if (duration.count() > LoadBlockQueue::WAL_MEM_BACK_PRESSURE_TIME_OUT)
{
+ return Status::TimedOut(
+ "Wal memory back pressure wait too much time! Load block
queue txn id: {}, "
+ "label: {}, instance id: {}",
+ txn_id, label, load_instance_id.to_string());
+ }
}
+ if (runtime_state->is_cancelled()) {
+ return Status::Cancelled(runtime_state->cancel_reason());
+ }
+ RETURN_IF_ERROR(status);
if (block->rows() > 0) {
_block_queue.push_back(block);
if (write_wal) {
diff --git a/be/src/runtime/group_commit_mgr.h
b/be/src/runtime/group_commit_mgr.h
index b0553b44876..03c917f8fcc 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -55,7 +55,8 @@ public:
_all_block_queues_bytes(all_block_queues_bytes),
_group_commit_interval_ms(group_commit_interval_ms) {};
- Status add_block(std::shared_ptr<vectorized::Block> block, bool write_wal);
+ Status add_block(RuntimeState* runtime_state,
std::shared_ptr<vectorized::Block> block,
+ bool write_wal);
Status get_block(RuntimeState* runtime_state, vectorized::Block* block,
bool* find_block,
bool* eos);
Status add_load_id(const UniqueId& load_id);
@@ -68,7 +69,10 @@ public:
bool has_enough_wal_disk_space(const
std::vector<std::shared_ptr<vectorized::Block>>& blocks,
const TUniqueId& load_id, bool
is_blocks_contain_all_load_data);
+ // 1s
static constexpr size_t MAX_BLOCK_QUEUE_ADD_WAIT_TIME = 1000;
+ // 120s
+ static constexpr size_t WAL_MEM_BACK_PRESSURE_TIME_OUT = 120000;
UniqueId load_instance_id;
std::string label;
int64_t txn_id;
diff --git a/be/src/vec/sink/group_commit_block_sink.cpp
b/be/src/vec/sink/group_commit_block_sink.cpp
index 56869665878..277b9859bd5 100644
--- a/be/src/vec/sink/group_commit_block_sink.cpp
+++ b/be/src/vec/sink/group_commit_block_sink.cpp
@@ -114,7 +114,7 @@ Status GroupCommitBlockSink::close(RuntimeState* state,
Status close_status) {
(double)state->num_rows_load_filtered() / num_selected_rows >
_max_filter_ratio) {
return Status::DataQualityError("too many filtered rows");
}
- RETURN_IF_ERROR(_add_blocks(true));
+ RETURN_IF_ERROR(_add_blocks(state, true));
}
if (_load_block_queue) {
_load_block_queue->remove_load_id(_load_id);
@@ -220,15 +220,16 @@ Status GroupCommitBlockSink::_add_block(RuntimeState*
state,
_blocks.emplace_back(output_block);
} else {
if (!_is_block_appended) {
- RETURN_IF_ERROR(_add_blocks(false));
+ RETURN_IF_ERROR(_add_blocks(state, false));
}
RETURN_IF_ERROR(_load_block_queue->add_block(
- output_block, _group_commit_mode ==
TGroupCommitMode::ASYNC_MODE));
+ state, output_block, _group_commit_mode ==
TGroupCommitMode::ASYNC_MODE));
}
return Status::OK();
}
-Status GroupCommitBlockSink::_add_blocks(bool is_blocks_contain_all_load_data)
{
+Status GroupCommitBlockSink::_add_blocks(RuntimeState* state,
+ bool is_blocks_contain_all_load_data)
{
DCHECK(_is_block_appended == false);
TUniqueId load_id;
load_id.__set_hi(_load_id.hi);
@@ -257,7 +258,7 @@ Status GroupCommitBlockSink::_add_blocks(bool
is_blocks_contain_all_load_data) {
}
for (auto it = _blocks.begin(); it != _blocks.end(); ++it) {
RETURN_IF_ERROR(_load_block_queue->add_block(
- *it, _group_commit_mode == TGroupCommitMode::ASYNC_MODE));
+ state, *it, _group_commit_mode ==
TGroupCommitMode::ASYNC_MODE));
}
_is_block_appended = true;
_blocks.clear();
diff --git a/be/src/vec/sink/group_commit_block_sink.h
b/be/src/vec/sink/group_commit_block_sink.h
index 84ffebf8fe1..3db4bdd31f8 100644
--- a/be/src/vec/sink/group_commit_block_sink.h
+++ b/be/src/vec/sink/group_commit_block_sink.h
@@ -47,7 +47,7 @@ public:
private:
Status _add_block(RuntimeState* state, std::shared_ptr<vectorized::Block>
block);
- Status _add_blocks(bool is_blocks_contain_all_load_data);
+ Status _add_blocks(RuntimeState* state, bool
is_blocks_contain_all_load_data);
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]