This is an automated email from the ASF dual-hosted git repository.
zhangchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7c056601d14 [Enhancement](group commit) Add bvar and log for group
commit (#31017)
7c056601d14 is described below
commit 7c056601d1474a332b142fa8fc142c2f2e4507f7
Author: abmdocrt <[email protected]>
AuthorDate: Wed Feb 21 20:29:25 2024 +0800
[Enhancement](group commit) Add bvar and log for group commit (#31017)
---
be/src/runtime/group_commit_mgr.cpp | 4 ++++
be/src/runtime/group_commit_mgr.h | 4 ++++
be/src/vec/sink/group_commit_block_sink.cpp | 8 ++++++--
3 files changed, 14 insertions(+), 2 deletions(-)
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index 7422890aa41..42c0e83f4b6 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -78,6 +78,7 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
VLOG_DEBUG << "group commit meets commit condition for data size,
label=" << label
<< ", instance_id=" << load_instance_id << ", data_bytes="
<< _data_bytes;
_need_commit = true;
+ data_size_condition = true;
}
_get_cond.notify_all();
return Status::OK();
@@ -417,6 +418,9 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t
db_id, int64_t table_
<< ", exec_plan_fragment status=" << status.to_string()
<< ", commit/abort txn rpc status=" << st.to_string()
<< ", commit/abort txn status=" << result_status.to_string()
+ << ", this group commit includes " <<
load_block_queue->group_commit_load_count << " loads"
+ << ", flush because meet "
+ << (load_block_queue->data_size_condition ? "data size " : "time ") <<
"condition"
<< ", wal space info:" <<
ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string();
if (state) {
if (!state->get_error_log_file_path().empty()) {
diff --git a/be/src/runtime/group_commit_mgr.h
b/be/src/runtime/group_commit_mgr.h
index e3b28be5804..5357ba208f7 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -78,6 +78,10 @@ public:
int64_t txn_id;
int64_t schema_version;
bool wait_internal_group_commit_finish = false;
+ bool data_size_condition = false;
+
+ // counts of load in one group commit
+ std::atomic_size_t group_commit_load_count = 0;
// the execute status of this internal group commit
std::mutex mutex;
diff --git a/be/src/vec/sink/group_commit_block_sink.cpp
b/be/src/vec/sink/group_commit_block_sink.cpp
index 6ff9d4a1425..2ce03ba0c1f 100644
--- a/be/src/vec/sink/group_commit_block_sink.cpp
+++ b/be/src/vec/sink/group_commit_block_sink.cpp
@@ -35,6 +35,8 @@
namespace doris {
namespace vectorized {
+bvar::Adder<int64_t> g_group_commit_load_rows("doris_group_commit_load_rows");
+bvar::Adder<int64_t>
g_group_commit_load_bytes("doris_group_commit_load_bytes");
GroupCommitBlockSink::GroupCommitBlockSink(ObjectPool* pool, const
RowDescriptor& row_desc,
const std::vector<TExpr>& texprs,
Status* status)
@@ -48,6 +50,7 @@ GroupCommitBlockSink::~GroupCommitBlockSink() {
if (_load_block_queue) {
_remove_estimated_wal_bytes();
_load_block_queue->remove_load_id(_load_id);
+ _load_block_queue->group_commit_load_count.fetch_add(1);
}
}
@@ -144,12 +147,13 @@ Status GroupCommitBlockSink::send(RuntimeState* state,
vectorized::Block* input_
return status;
}
SCOPED_TIMER(_profile->total_time_counter());
+
// update incrementally so that FE can get the progress.
// the real 'num_rows_load_total' will be set when sink being closed.
state->update_num_rows_load_total(rows);
state->update_num_bytes_load_total(bytes);
- DorisMetrics::instance()->load_rows->increment(rows);
- DorisMetrics::instance()->load_bytes->increment(bytes);
+ g_group_commit_load_rows << rows;
+ g_group_commit_load_bytes << bytes;
std::shared_ptr<vectorized::Block> block;
bool has_filtered_rows = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]