This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new cc9dc331a76 branch-3.1: [feat](txn) Add txn write amplification brpc 
metrics #55544 (#56832)
cc9dc331a76 is described below

commit cc9dc331a76cb3883bc3f4a36d1c2f4ddc489be5
Author: Yixuan Wang <[email protected]>
AuthorDate: Mon Oct 13 12:09:15 2025 +0800

    branch-3.1: [feat](txn) Add txn write amplification brpc metrics #55544 
(#56832)
    
    picked from #55544
---
 cloud/src/common/bvars.cpp                  |  10 +
 cloud/src/common/bvars.h                    | 358 ++++++++++++++++++++++++++++
 cloud/src/meta-service/meta_service.h       |   5 +
 cloud/src/meta-service/meta_service_txn.cpp |  26 ++
 cloud/test/CMakeLists.txt                   |   7 +
 cloud/test/bvars_test.cpp                   | 129 ++++++++++
 6 files changed, 535 insertions(+)

diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index a3448306110..c1812e47a64 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -17,7 +17,9 @@
 
 #include "common/bvars.h"
 
+#include <bvar/latency_recorder.h>
 #include <bvar/multi_dimension.h>
+#include <bvar/passive_status.h>
 #include <bvar/reducer.h>
 #include <bvar/status.h>
 #include <bvar/window.h>
@@ -25,6 +27,8 @@
 #include <cstdint>
 #include <stdexcept>
 
+#include "meta-store/txn_kv.h"
+
 // clang-format off
 
 // meta-service's bvars
@@ -97,6 +101,12 @@ bvar::Adder<int64_t> 
g_bvar_update_delete_bitmap_fail_counter;
 bvar::Window<bvar::Adder<int64_t> > 
g_bvar_update_delete_bitmap_fail_counter_minute("ms", 
"update_delete_bitmap_fail", &g_bvar_update_delete_bitmap_fail_counter, 60);
 bvar::Adder<int64_t> g_bvar_get_delete_bitmap_fail_counter;
 bvar::Window<bvar::Adder<int64_t> > 
g_bvar_get_delete_bitmap_fail_counter_minute("ms", "get_delete_bitmap_fail", 
&g_bvar_get_delete_bitmap_fail_counter, 60);
+BvarLatencyRecorderWithStatus<60> 
g_bvar_ms_txn_commit_with_partition_count("ms", 
"txn_commit_with_partition_count");
+BvarLatencyRecorderWithStatus<60> g_bvar_ms_txn_commit_with_tablet_count("ms", 
"txn_commit_with_tablet_count");
+
+MBvarLatencyRecorderWithStatus<60> 
g_bvar_instance_txn_commit_with_partition_count("instance", 
"txn_commit_with_partition_count", {"instance_id"});
+MBvarLatencyRecorderWithStatus<60> 
g_bvar_instance_txn_commit_with_tablet_count("instance", 
"txn_commit_with_tablet_count", {"instance_id"});
+
 
 // recycler's bvars
 // TODO: use mbvar for per instance, 
https://github.com/apache/brpc/blob/master/docs/cn/mbvar_c++.md
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index a3ec5a8d26a..13d50ddaa33 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -17,13 +17,20 @@
 
 #pragma once
 
+#include <aws/core/external/cjson/cJSON.h>
+#include <bthread/bthread.h>
 #include <bthread/mutex.h>
+#include <bthread/unstable.h>
 #include <bvar/bvar.h>
 #include <bvar/latency_recorder.h>
 #include <bvar/multi_dimension.h>
+#include <bvar/passive_status.h>
 #include <bvar/reducer.h>
 #include <bvar/status.h>
+#include <cpp/sync_point.h>
+#include <gmock/gmock-actions.h>
 
+#include <atomic>
 #include <cstdint>
 #include <initializer_list>
 #include <map>
@@ -33,6 +40,8 @@
 #include <type_traits>
 #include <utility>
 
+#include "common/logging.h"
+
 /**
  * Manage bvars that with similar names (identical prefix)
  * ${module}_${name}_${tag}
@@ -177,6 +186,351 @@ private:
 
     bvar::MultiDimension<BvarType> counter_;
 };
+/**
+ * @class BvarLatencyRecorderWithStatus
+ * @brief A latency recorder with auto-exposed max and average metrics
+ *
+ * This class wraps a bvar::LatencyRecorder and automatically creates two
+ * additional PassiveStatus metrics to expose the maximum and average latency.
+ * This makes it convenient to track these key metrics in monitoring systems.
+ * 
+ * @tparam N Window size in seconds for the latency recorder, defaults to 60 
seconds
+ *
+ * @note Unlike mBvarLatencyRecorderWithStatus, this class doesn't support 
multi-dimensional
+ * metrics and doesn't use a timer to update status. It uses PassiveStatus to 
calculate
+ * statistics in real-time when queried.
+ *
+ * @example Basic usage:
+ *   // Create a latency recorder
+ *   BvarLatencyRecorderWithStatus<> my_latency("my_service_latency");
+ *   // Record a latency value (in microseconds)
+ *   my_latency << 1500;  // or my_latency.put(1500);
+ *   // This will create three metrics:
+ *   // - The original latency recorder (hidden)
+ *   // - my_service_latency_max (showing maximum latency)
+ *   // - my_service_latency_avg (showing average latency)
+ */
+template <int N = 60>
+class BvarLatencyRecorderWithStatus {
+public:
+    /**
+     * @brief Constructor
+     * @param metric_name Base name for the metrics, _max and _avg suffixes 
will be added
+     */
+    BvarLatencyRecorderWithStatus(const std::string& metric_name)
+            : recorder_(N),
+              max_status_(metric_name + "_max", get_max_latency, this),
+              avg_status_(metric_name + "_avg", get_avg_latency, this),
+              count_status_(metric_name + "_count", get_count_latency, this) {
+        recorder_.hide();
+    }
+
+    /**
+     * @brief Constructor with prefix
+     * @param prefix Prefix for the metric name
+     * @param metric_name Base name for the metrics
+     */
+    BvarLatencyRecorderWithStatus(const std::string& prefix, const 
std::string& metric_name)
+            : BvarLatencyRecorderWithStatus(prefix + "_" + metric_name) {}
+
+    /**
+     * @brief Record a latency value
+     * @param value Latency value to record (in microseconds)
+     */
+    void put(int64_t value) { recorder_ << value; }
+
+    /**
+     * @brief Stream operator for recording latency values
+     * @param value Latency value to record (in microseconds)
+     */
+    void operator<<(int64_t value) { recorder_ << value; }
+
+    int64_t max() const { return recorder_.max_latency(); }
+
+    int64_t avg() const { return recorder_.latency(); }
+
+    int64_t count() const { return recorder_.count(); }
+
+private:
+    bvar::LatencyRecorder recorder_;            // The underlying latency 
recorder
+    bvar::PassiveStatus<int64_t> max_status_;   // Passive status for maximum 
latency
+    bvar::PassiveStatus<int64_t> avg_status_;   // Passive status for average 
latency
+    bvar::PassiveStatus<int64_t> count_status_; // Passive status for count 
latency
+
+    /**
+     * @brief Callback function to get maximum latency
+     * @param arg Pointer to the BvarLatencyRecorderWithStatus instance
+     * @return Maximum latency value, or 0 if negative
+     */
+    static int64_t get_max_latency(void* arg) {
+        auto* self = static_cast<BvarLatencyRecorderWithStatus*>(arg);
+        int64_t value = self->recorder_.max_latency();
+        return value >= 0 ? value : 0;
+    }
+
+    /**
+     * @brief Callback function to get average latency
+     * @param arg Pointer to the BvarLatencyRecorderWithStatus instance
+     * @return Average latency value, or 0 if negative
+     */
+    static int64_t get_avg_latency(void* arg) {
+        auto* self = static_cast<BvarLatencyRecorderWithStatus*>(arg);
+        int64_t value = self->recorder_.latency();
+        return value >= 0 ? value : 0;
+    }
+
+    /**
+     * @brief Callback function to get count latency
+     * @param arg Pointer to the BvarLatencyRecorderWithStatus instance
+     * @return Count latency value, or 0 if negative
+     */
+    static int64_t get_count_latency(void* arg) {
+        auto* self = static_cast<BvarLatencyRecorderWithStatus*>(arg);
+        int64_t value = self->recorder_.count();
+        return value >= 0 ? value : 0;
+    }
+};
+
+/**
+ * @class MBvarLatencyRecorderWithStatus
+ * @brief A multi-dimensional latency recorder with status metrics
+ * 
+ * This class provides a way to record latency metrics across multiple 
dimensions and
+ * automatically update status metrics (max and average) at regular intervals.
+ * It leverages bvar's MultiDimension capability to track metrics per 
dimension combination.
+ * 
+ * @tparam N Window size in seconds for the latency recorder (default: 60)
+ */
+template <int N = 60>
+class MBvarLatencyRecorderWithStatus {
+private:
+    /**
+    * @class ScheduledLatencyUpdater
+    * @brief A helper class to schedule deferred execution of tasks using 
bthread timer.
+    * 
+    * This class provides a way to execute a callback function after a 
specified time interval.
+    * It takes care of safely managing the lifecycle of the timer and ensures 
the callback
+    * is only executed if the timer and its arguments are still valid.
+    * 
+    * @note This class requires bthread to be initialized before use. If 
bthread is not
+    * initialized, timer creation will fail.
+    */
+    class ScheduledLatencyUpdater : public bvar::LatencyRecorder {
+    public:
+        /**
+        * @brief Constructor for ScheduledLatencyUpdater
+        * 
+        * @param interval_s The time interval in seconds after which the 
callback should be executed
+        * @param arg Optional argument to pass to the callback function
+        */
+        ScheduledLatencyUpdater(size_t interval_s, void* arg = nullptr)
+                : bvar::LatencyRecorder(interval_s), _interval_s(interval_s), 
_arg(arg) {
+            hide();
+        }
+
+        /**
+        * @brief Destructor
+        * 
+        * Stops the timer if it's still running to prevent any callbacks after 
destruction
+        */
+        ~ScheduledLatencyUpdater() { stop(); }
+
+        /**
+        * @brief Start the timer
+        * 
+        * Schedules the callback function to be executed after the specified 
interval.
+        * Does nothing if the timer has already been started.
+        * 
+        * @return true if the timer was successfully started, false otherwise
+        */
+        bool start() {
+            if (!_started.load()) {
+                {
+                    std::lock_guard<bthread::Mutex> l(init_mutex_);
+                    if (!_started.load()) {
+                        if (!schedule()) {
+                            return false;
+                        }
+                        _started.store(true);
+                    }
+                    return true;
+                }
+            }
+            return true;
+        }
+
+        /*** @brief Reschedule the timer
+        * 
+        * Scheduling a one-time task.
+        * This is useful if you want to reset the timer interval.
+        */
+        bool schedule() {
+            if (bthread_timer_add(&_timer, 
butil::seconds_from_now(_interval_s), update, this) !=
+                0) {
+                LOG(WARNING) << "Failed to add bthread timer for 
ScheduledLatencyUpdater";
+                return false;
+            }
+            return true;
+        }
+
+        /**
+        * @brief Background update function
+        * 
+        * This function is called periodically by the timer to update the 
status metrics
+        * with the current values from the latency recorders.
+        * 
+        * @param arg Pointer to the mBvarLatencyRecorderWithStatus instance
+        */
+        static void update(void* arg) {
+            auto* latency_updater = static_cast<ScheduledLatencyUpdater*>(arg);
+            if (!latency_updater || !latency_updater->_started) {
+                LOG(WARNING) << "Invalid ScheduledLatencyUpdater in timer 
callback";
+                return;
+            }
+
+            VLOG_DEBUG << "Timer triggered for ScheduledLatencyUpdater, 
interval: "
+                       << latency_updater->_interval_s << "s";
+
+            auto* parent = 
static_cast<MBvarLatencyRecorderWithStatus*>(latency_updater->_arg);
+            if (!parent) {
+                LOG(WARNING) << "Invalid parent container in timer callback";
+                return;
+            }
+
+            std::list<std::string> current_dim_list;
+            {
+                std::lock_guard<bthread::Mutex> l(parent->recorder_mutex_);
+                for (const auto& it : parent->recorder_) {
+                    if (it.second.get() == latency_updater) {
+                        current_dim_list = it.first;
+                        break;
+                    }
+                }
+            }
+
+            if (current_dim_list.empty()) {
+                LOG(WARNING) << "Could not find dimension for 
ScheduledLatencyUpdater";
+                return;
+            }
+
+            {
+                std::lock_guard<bthread::Mutex> l(parent->timer_mutex_);
+
+                bvar::Status<int64_t>* max_status = 
parent->max_status_.get_stats(current_dim_list);
+                bvar::Status<int64_t>* avg_status = 
parent->avg_status_.get_stats(current_dim_list);
+                bvar::Status<int64_t>* count_status =
+                        parent->count_status_.get_stats(current_dim_list);
+
+                VLOG_DEBUG << "Updating latency recorder status for dimension, 
"
+                           << "max_latency: " << latency_updater->max_latency()
+                           << ", avg_latency: " << latency_updater->latency();
+                TEST_SYNC_POINT("mBvarLatencyRecorderWithStatus::update");
+
+                if (max_status) {
+                    max_status->set_value(latency_updater->max_latency());
+                }
+                if (avg_status) {
+                    avg_status->set_value(latency_updater->latency());
+                }
+                if (count_status) {
+                    count_status->set_value(latency_updater->count());
+                }
+            }
+
+            if (latency_updater->_started && !latency_updater->schedule()) {
+                LOG(WARNING) << "Failed to reschedule timer for 
ScheduledLatencyUpdater";
+                latency_updater->_started = false;
+            }
+        }
+
+        /**
+        * @brief Stop the timer
+        * 
+        * Cancels the timer if it's running and marks arguments as invalid to 
prevent
+        * any pending callbacks from accessing potentially freed resources.
+        */
+        void stop() {
+            if (_started.load()) {
+                bthread_timer_del(_timer);
+                _started = false;
+            }
+        }
+
+    private:
+        int _interval_s;                   // Timer interval in seconds
+        void* _arg;                        // Argument to pass to the callback
+        bthread_timer_t _timer;            // The bthread timer handle
+        std::atomic_bool _started {false}; // Whether the timer has been 
started
+        bthread::Mutex init_mutex_;        // Mutex for timer_map_
+    };
+
+public:
+    /**
+     * @brief Constructor
+     * 
+     * @param metric_name Base name for the metrics
+     * @param dim_names List of dimension names
+     */
+    MBvarLatencyRecorderWithStatus(const std::string& metric_name,
+                                   const std::initializer_list<std::string>& 
dim_names)
+            : _metric_name(metric_name),
+              max_status_(metric_name + "_max", 
std::list<std::string>(dim_names)),
+              avg_status_(metric_name + "_avg", 
std::list<std::string>(dim_names)),
+              count_status_(metric_name + "_count", 
std::list<std::string>(dim_names)) {}
+
+    MBvarLatencyRecorderWithStatus(const std::string& prefix, const 
std::string& metric_name,
+                                   const std::initializer_list<std::string>& 
dim_names)
+            : MBvarLatencyRecorderWithStatus(prefix + "_" + metric_name, 
dim_names) {}
+
+    /**
+     * @brief Record a latency value
+     * 
+     * @param dim_values List of dimension values (must match the number of 
dimensions)
+     * @param value The latency value to record
+     */
+    void put(const std::initializer_list<std::string>& dim_values, int64_t 
value) {
+        std::list<std::string> dim_list(dim_values);
+        std::shared_ptr<ScheduledLatencyUpdater> latency = nullptr;
+        {
+            std::lock_guard<bthread::Mutex> l(recorder_mutex_);
+            auto it = recorder_.find(dim_list);
+            if (it == recorder_.end()) {
+                int inteval_s = N;
+                
TEST_SYNC_POINT_CALLBACK("mBvarLatencyRecorderWithStatus::put", &inteval_s);
+                latency = std::make_shared<ScheduledLatencyUpdater>(inteval_s, 
this);
+                recorder_[dim_list] = latency;
+            } else {
+                latency = it->second;
+            }
+        }
+
+        auto* latency_ptr = latency.get();
+        latency->start();
+        *latency_ptr << value;
+    }
+
+    int64_t get_max(const std::initializer_list<std::string>& dim_values) {
+        return 
max_status_.get_stats(std::list<std::string>(dim_values))->get_value();
+    }
+
+    int64_t get_avg(const std::initializer_list<std::string>& dim_values) {
+        return 
avg_status_.get_stats(std::list<std::string>(dim_values))->get_value();
+    }
+
+    int64_t get_count(const std::initializer_list<std::string>& dim_values) {
+        return 
count_status_.get_stats(std::list<std::string>(dim_values))->get_value();
+    }
+
+private:
+    std::string _metric_name;
+    // dim_names -> recorder
+    std::map<std::list<std::string>, std::shared_ptr<ScheduledLatencyUpdater>> 
recorder_;
+    bvar::MultiDimension<bvar::Status<int64_t>> max_status_;
+    bvar::MultiDimension<bvar::Status<int64_t>> avg_status_;
+    bvar::MultiDimension<bvar::Status<int64_t>> count_status_;
+    bthread::Mutex recorder_mutex_; // Mutex for recorder_
+    bthread::Mutex timer_mutex_;    // Mutex for timer_map_
+};
 
 using mBvarIntAdder = mBvarWrapper<bvar::Adder<int>>;
 using mBvarInt64Adder = mBvarWrapper<bvar::Adder<int64_t>>;
@@ -256,6 +610,10 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_check_kv;
 extern BvarLatencyRecorderWithTag g_bvar_ms_get_schema_dict;
 extern bvar::Adder<int64_t> g_bvar_update_delete_bitmap_fail_counter;
 extern bvar::Adder<int64_t> g_bvar_get_delete_bitmap_fail_counter;
+extern BvarLatencyRecorderWithStatus<60> 
g_bvar_ms_txn_commit_with_tablet_count;
+extern BvarLatencyRecorderWithStatus<60> 
g_bvar_ms_txn_commit_with_partition_count;
+extern MBvarLatencyRecorderWithStatus<60> 
g_bvar_instance_txn_commit_with_partition_count;
+extern MBvarLatencyRecorderWithStatus<60> 
g_bvar_instance_txn_commit_with_tablet_count;
 
 // recycler's bvars
 extern BvarStatusWithTag<int64_t> g_bvar_recycler_recycle_index_earlest_ts;
diff --git a/cloud/src/meta-service/meta_service.h 
b/cloud/src/meta-service/meta_service.h
index d29eb4488f2..8e1c2b0ed17 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -62,6 +62,11 @@ static void* run_bthread_work(void* arg) {
            lock_id == SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID;
 }
 
+[[maybe_unused]] void record_txn_commit_stats(doris::cloud::Transaction* txn,
+                                              const std::string& instance_id,
+                                              int64_t partition_count, int64_t 
tablet_count,
+                                              int64_t txn_id);
+
 class MetaServiceImpl : public cloud::MetaService {
 public:
     MetaServiceImpl(std::shared_ptr<TxnKv> txn_kv, 
std::shared_ptr<ResourceManager> resource_mgr,
diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index 4ed3c2a1c3d..3e16bbd106e 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -1087,6 +1087,8 @@ void commit_txn_immediately(
 
         // {table/partition} -> version
         std::unordered_map<std::string, uint64_t> new_versions;
+        // partition_id
+        std::unordered_set<int64_t> partition_ids;
         std::vector<std::string> version_keys;
         for (auto& [_, i] : tmp_rowsets_meta) {
             int64_t tablet_id = i.tablet_id();
@@ -1098,6 +1100,7 @@ void commit_txn_immediately(
                 new_versions.insert({ver_key, 0});
                 version_keys.push_back(std::move(ver_key));
             }
+            partition_ids.insert(partition_id);
         }
         std::vector<std::optional<std::string>> version_values;
         err = txn->batch_get(&version_values, version_keys);
@@ -1156,6 +1159,7 @@ void commit_txn_immediately(
         }
 
         std::vector<std::pair<std::string, std::string>> rowsets;
+
         std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id -> 
stats
         rowsets.reserve(tmp_rowsets_meta.size());
 
@@ -1644,6 +1648,8 @@ void commit_txn_eventually(
 
         // <partition_version_key, version>
         std::unordered_map<std::string, uint64_t> new_versions;
+        // partition_id
+        std::unordered_set<int64_t> partition_ids;
         std::vector<std::string> version_keys;
         for (auto& [_, i] : tmp_rowsets_meta) {
             int64_t tablet_id = i.tablet_id();
@@ -1655,6 +1661,7 @@ void commit_txn_eventually(
                 new_versions.insert({ver_key, 0});
                 version_keys.push_back(std::move(ver_key));
             }
+            partition_ids.insert(partition_id);
         }
 
         std::vector<std::optional<std::string>> version_values;
@@ -1716,6 +1723,9 @@ void commit_txn_eventually(
             continue;
         }
 
+        record_txn_commit_stats(txn.get(), instance_id, partition_ids.size(), 
tablet_ids.size(),
+                                txn_id);
+
         std::string info_val;
         const std::string info_key = txn_info_key({instance_id, db_id, 
txn_id});
         err = txn->get(info_key, &info_val);
@@ -2336,6 +2346,9 @@ void commit_txn_with_sub_txn(const CommitTxnRequest* 
request, CommitTxnResponse*
                 
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
 
         std::vector<std::pair<std::string, std::string>> rowsets;
+        record_txn_commit_stats(txn.get(), instance_id, partition_ids.size(), 
tablet_ids.size(),
+                                txn_id);
+
         std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id -> 
stats
         for (const auto& sub_txn_info : sub_txn_infos) {
             auto sub_txn_id = sub_txn_info.sub_txn_id();
@@ -4092,4 +4105,17 @@ void 
MetaServiceImpl::get_txn_id(::google::protobuf::RpcController* controller,
     return;
 }
 
+void record_txn_commit_stats(doris::cloud::Transaction* txn, const 
std::string& instance_id,
+                             int64_t partition_count, int64_t tablet_count, 
int64_t txn_id) {
+    int64_t kv_count = txn->num_put_keys() + txn->num_del_keys() + 
txn->num_get_keys();
+    int64_t kv_bytes = txn->get_bytes();
+    LOG(INFO) << "txn commit stats, instance_id: " << instance_id << ", 
txn_id: " << txn_id
+              << ", kv_count: " << kv_count << ", kv_bytes: " << kv_bytes
+              << ", partition_count: " << partition_count << ", tablet_count: 
" << tablet_count;
+    g_bvar_ms_txn_commit_with_partition_count << partition_count;
+    g_bvar_ms_txn_commit_with_tablet_count << tablet_count;
+    g_bvar_instance_txn_commit_with_partition_count.put({instance_id}, 
partition_count);
+    g_bvar_instance_txn_commit_with_tablet_count.put({instance_id}, 
tablet_count);
+}
+
 } // namespace doris::cloud
diff --git a/cloud/test/CMakeLists.txt b/cloud/test/CMakeLists.txt
index 4208eb7019f..0a68b019721 100644
--- a/cloud/test/CMakeLists.txt
+++ b/cloud/test/CMakeLists.txt
@@ -67,6 +67,9 @@ add_executable(network_util_test network_util_test.cpp)
 
 add_executable(txn_lazy_commit_test txn_lazy_commit_test.cpp)
 
+
+add_executable(bvars_test bvars_test.cpp)
+
 message("Meta-service test dependencies: ${TEST_LINK_LIBS}")
 #target_link_libraries(sync_point_test ${TEST_LINK_LIBS})
 
@@ -138,6 +141,10 @@ target_link_libraries(fdb_injection_test
     ${FDB_LINKER_FLAGS}
     ${TEST_LINK_LIBS})
 
+target_link_libraries(bvars_test
+    ${FDB_LINKER_FLAGS}
+    ${TEST_LINK_LIBS})
+
 install(FILES
     ${BASE_DIR}/script/run_all_tests.sh
     ${BASE_DIR}/conf/fdb.cluster
diff --git a/cloud/test/bvars_test.cpp b/cloud/test/bvars_test.cpp
new file mode 100644
index 00000000000..8f895354f18
--- /dev/null
+++ b/cloud/test/bvars_test.cpp
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "common/bvars.h"
+
+#include <brpc/server.h>
+#include <bthread/bthread.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include <atomic>
+#include <chrono>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include "common/logging.h"
+#include "meta-service/meta_service.h"
+#include "meta-store/mem_txn_kv.h"
+
+namespace doris::cloud {
+
+int main(int argc, char** argv) {
+    if (!cloud::init_glog("bvars_test")) {
+        std::cerr << "failed to init glog" << std::endl;
+        return -1;
+    }
+
+    ::testing::InitGoogleTest(&argc, argv);
+    return RUN_ALL_TESTS();
+}
+
+class BvarsTest : public ::testing::Test {
+public:
+    void SetUp() override {
+        if (server.Start("0.0.0.0:0", &options) == -1) {
+            perror("Start brpc server");
+        }
+    }
+    void TearDown() override {
+        server.Stop(0);
+        server.Join();
+    }
+    brpc::ServerOptions options;
+    brpc::Server server;
+};
+TEST(BvarsTest, MultiThreadRecordMetrics) {
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    ASSERT_EQ(txn_kv->init(), 0);
+
+    std::vector<std::thread> threads;
+    std::atomic_long update_count = 0;
+    MBvarLatencyRecorderWithStatus<60> 
mbvarlr_partition_test("partition_test", {"instance_id"});
+    MBvarLatencyRecorderWithStatus<60> mbvarlr_tablet_test("tablet_test", 
{"instance_id"});
+
+    int interval_s = 60;
+    auto* sp = SyncPoint::get_instance();
+    sp->set_call_back("mBvarLatencyRecorderWithStatus::put", 
[&interval_s](auto&& args) {
+        auto* vault = try_any_cast<int*>(args[0]);
+        *vault = interval_s;
+    });
+    sp->set_call_back("mBvarLatencyRecorderWithStatus::update",
+                      [&update_count](auto&& args) { update_count++; });
+    sp->enable_processing();
+
+    for (int i = 0; i < 20; ++i) {
+        threads.emplace_back([&]() {
+            for (size_t j = 0; j < 1000; j++) {
+                std::unique_ptr<Transaction> txn;
+                ASSERT_EQ(TxnErrorCode::TXN_OK, txn_kv->create_txn(&txn));
+
+                std::string instance_id = "instance_" + std::to_string(j % 
100);
+                int64_t partition_count = 10 + j;
+                int64_t tablet_count = 50 + j;
+
+                mbvarlr_partition_test.put({instance_id}, partition_count);
+                mbvarlr_tablet_test.put({instance_id}, tablet_count);
+
+                std::this_thread::sleep_for(std::chrono::microseconds(1000));
+            }
+        });
+    }
+
+    for (int i = 0; i < 20; ++i) {
+        threads.emplace_back([&]() {
+            for (size_t j = 0; j < 1000; j++) {
+                std::unique_ptr<Transaction> txn;
+                ASSERT_EQ(TxnErrorCode::TXN_OK, txn_kv->create_txn(&txn));
+
+                std::string instance_id = "instance_" + std::to_string(j % 
100);
+
+                mbvarlr_partition_test.get_count({instance_id});
+                mbvarlr_partition_test.get_avg({instance_id});
+                mbvarlr_partition_test.get_max({instance_id});
+                mbvarlr_tablet_test.get_count({instance_id});
+                mbvarlr_tablet_test.get_avg({instance_id});
+                mbvarlr_tablet_test.get_max({instance_id});
+
+                std::this_thread::sleep_for(std::chrono::microseconds(1000));
+            }
+        });
+    }
+
+    for (auto& t : threads) {
+        t.join();
+    }
+
+    sleep(interval_s * 3);
+
+    ASSERT_GT(update_count.load(), 200);
+}
+
+} // namespace doris::cloud
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to