This is an automated email from the ASF dual-hosted git repository.

zhangchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c056601d14 [Enhancement](group commit) Add bvar and log for group 
commit (#31017)
7c056601d14 is described below

commit 7c056601d1474a332b142fa8fc142c2f2e4507f7
Author: abmdocrt <[email protected]>
AuthorDate: Wed Feb 21 20:29:25 2024 +0800

    [Enhancement](group commit) Add bvar and log for group commit (#31017)
---
 be/src/runtime/group_commit_mgr.cpp         | 4 ++++
 be/src/runtime/group_commit_mgr.h           | 4 ++++
 be/src/vec/sink/group_commit_block_sink.cpp | 8 ++++++--
 3 files changed, 14 insertions(+), 2 deletions(-)

diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 7422890aa41..42c0e83f4b6 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -78,6 +78,7 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
         VLOG_DEBUG << "group commit meets commit condition for data size, 
label=" << label
                    << ", instance_id=" << load_instance_id << ", data_bytes=" 
<< _data_bytes;
         _need_commit = true;
+        data_size_condition = true;
     }
     _get_cond.notify_all();
     return Status::OK();
@@ -417,6 +418,9 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t 
db_id, int64_t table_
        << ", exec_plan_fragment status=" << status.to_string()
        << ", commit/abort txn rpc status=" << st.to_string()
        << ", commit/abort txn status=" << result_status.to_string()
+       << ", this group commit includes " << 
load_block_queue->group_commit_load_count << " loads"
+       << ", flush because meet "
+       << (load_block_queue->data_size_condition ? "data size " : "time ") << 
"condition"
        << ", wal space info:" << 
ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string();
     if (state) {
         if (!state->get_error_log_file_path().empty()) {
diff --git a/be/src/runtime/group_commit_mgr.h 
b/be/src/runtime/group_commit_mgr.h
index e3b28be5804..5357ba208f7 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -78,6 +78,10 @@ public:
     int64_t txn_id;
     int64_t schema_version;
     bool wait_internal_group_commit_finish = false;
+    bool data_size_condition = false;
+
+    // counts of load in one group commit
+    std::atomic_size_t group_commit_load_count = 0;
 
     // the execute status of this internal group commit
     std::mutex mutex;
diff --git a/be/src/vec/sink/group_commit_block_sink.cpp 
b/be/src/vec/sink/group_commit_block_sink.cpp
index 6ff9d4a1425..2ce03ba0c1f 100644
--- a/be/src/vec/sink/group_commit_block_sink.cpp
+++ b/be/src/vec/sink/group_commit_block_sink.cpp
@@ -35,6 +35,8 @@
 namespace doris {
 
 namespace vectorized {
+bvar::Adder<int64_t> g_group_commit_load_rows("doris_group_commit_load_rows");
+bvar::Adder<int64_t> 
g_group_commit_load_bytes("doris_group_commit_load_bytes");
 
 GroupCommitBlockSink::GroupCommitBlockSink(ObjectPool* pool, const 
RowDescriptor& row_desc,
                                            const std::vector<TExpr>& texprs, 
Status* status)
@@ -48,6 +50,7 @@ GroupCommitBlockSink::~GroupCommitBlockSink() {
     if (_load_block_queue) {
         _remove_estimated_wal_bytes();
         _load_block_queue->remove_load_id(_load_id);
+        _load_block_queue->group_commit_load_count.fetch_add(1);
     }
 }
 
@@ -144,12 +147,13 @@ Status GroupCommitBlockSink::send(RuntimeState* state, 
vectorized::Block* input_
         return status;
     }
     SCOPED_TIMER(_profile->total_time_counter());
+
     // update incrementally so that FE can get the progress.
     // the real 'num_rows_load_total' will be set when sink being closed.
     state->update_num_rows_load_total(rows);
     state->update_num_bytes_load_total(bytes);
-    DorisMetrics::instance()->load_rows->increment(rows);
-    DorisMetrics::instance()->load_bytes->increment(bytes);
+    g_group_commit_load_rows << rows;
+    g_group_commit_load_bytes << bytes;
 
     std::shared_ptr<vectorized::Block> block;
     bool has_filtered_rows = false;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to