This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new cc9dc331a76 branch-3.1: [feat](txn) Add txn write amplification brpc
metrics #55544 (#56832)
cc9dc331a76 is described below
commit cc9dc331a76cb3883bc3f4a36d1c2f4ddc489be5
Author: Yixuan Wang <[email protected]>
AuthorDate: Mon Oct 13 12:09:15 2025 +0800
branch-3.1: [feat](txn) Add txn write amplification brpc metrics #55544
(#56832)
picked from #55544
---
cloud/src/common/bvars.cpp | 10 +
cloud/src/common/bvars.h | 358 ++++++++++++++++++++++++++++
cloud/src/meta-service/meta_service.h | 5 +
cloud/src/meta-service/meta_service_txn.cpp | 26 ++
cloud/test/CMakeLists.txt | 7 +
cloud/test/bvars_test.cpp | 129 ++++++++++
6 files changed, 535 insertions(+)
diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index a3448306110..c1812e47a64 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -17,7 +17,9 @@
#include "common/bvars.h"
+#include <bvar/latency_recorder.h>
#include <bvar/multi_dimension.h>
+#include <bvar/passive_status.h>
#include <bvar/reducer.h>
#include <bvar/status.h>
#include <bvar/window.h>
@@ -25,6 +27,8 @@
#include <cstdint>
#include <stdexcept>
+#include "meta-store/txn_kv.h"
+
// clang-format off
// meta-service's bvars
@@ -97,6 +101,12 @@ bvar::Adder<int64_t>
g_bvar_update_delete_bitmap_fail_counter;
bvar::Window<bvar::Adder<int64_t> >
g_bvar_update_delete_bitmap_fail_counter_minute("ms",
"update_delete_bitmap_fail", &g_bvar_update_delete_bitmap_fail_counter, 60);
bvar::Adder<int64_t> g_bvar_get_delete_bitmap_fail_counter;
bvar::Window<bvar::Adder<int64_t> >
g_bvar_get_delete_bitmap_fail_counter_minute("ms", "get_delete_bitmap_fail",
&g_bvar_get_delete_bitmap_fail_counter, 60);
+BvarLatencyRecorderWithStatus<60>
g_bvar_ms_txn_commit_with_partition_count("ms",
"txn_commit_with_partition_count");
+BvarLatencyRecorderWithStatus<60> g_bvar_ms_txn_commit_with_tablet_count("ms",
"txn_commit_with_tablet_count");
+
+MBvarLatencyRecorderWithStatus<60>
g_bvar_instance_txn_commit_with_partition_count("instance",
"txn_commit_with_partition_count", {"instance_id"});
+MBvarLatencyRecorderWithStatus<60>
g_bvar_instance_txn_commit_with_tablet_count("instance",
"txn_commit_with_tablet_count", {"instance_id"});
+
// recycler's bvars
// TODO: use mbvar for per instance,
https://github.com/apache/brpc/blob/master/docs/cn/mbvar_c++.md
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index a3ec5a8d26a..13d50ddaa33 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -17,13 +17,20 @@
#pragma once
+#include <aws/core/external/cjson/cJSON.h>
+#include <bthread/bthread.h>
#include <bthread/mutex.h>
+#include <bthread/unstable.h>
#include <bvar/bvar.h>
#include <bvar/latency_recorder.h>
#include <bvar/multi_dimension.h>
+#include <bvar/passive_status.h>
#include <bvar/reducer.h>
#include <bvar/status.h>
+#include <cpp/sync_point.h>
+#include <gmock/gmock-actions.h>
+#include <atomic>
#include <cstdint>
#include <initializer_list>
#include <map>
@@ -33,6 +40,8 @@
#include <type_traits>
#include <utility>
+#include "common/logging.h"
+
/**
* Manage bvars that with similar names (identical prefix)
* ${module}_${name}_${tag}
@@ -177,6 +186,351 @@ private:
bvar::MultiDimension<BvarType> counter_;
};
+/**
+ * @class BvarLatencyRecorderWithStatus
+ * @brief A latency recorder with auto-exposed max and average metrics
+ *
+ * This class wraps a bvar::LatencyRecorder and automatically creates two
+ * additional PassiveStatus metrics to expose the maximum and average latency.
+ * This makes it convenient to track these key metrics in monitoring systems.
+ *
+ * @tparam N Window size in seconds for the latency recorder, defaults to 60
seconds
+ *
+ * @note Unlike mBvarLatencyRecorderWithStatus, this class doesn't support
multi-dimensional
+ * metrics and doesn't use a timer to update status. It uses PassiveStatus to
calculate
+ * statistics in real-time when queried.
+ *
+ * @example Basic usage:
+ * // Create a latency recorder
+ * BvarLatencyRecorderWithStatus<> my_latency("my_service_latency");
+ * // Record a latency value (in microseconds)
+ * my_latency << 1500; // or my_latency.put(1500);
+ * // This will create three metrics:
+ * // - The original latency recorder (hidden)
+ * // - my_service_latency_max (showing maximum latency)
+ * // - my_service_latency_avg (showing average latency)
+ */
+template <int N = 60>
+class BvarLatencyRecorderWithStatus {
+public:
+ /**
+ * @brief Constructor
+ * @param metric_name Base name for the metrics, _max and _avg suffixes
will be added
+ */
+ BvarLatencyRecorderWithStatus(const std::string& metric_name)
+ : recorder_(N),
+ max_status_(metric_name + "_max", get_max_latency, this),
+ avg_status_(metric_name + "_avg", get_avg_latency, this),
+ count_status_(metric_name + "_count", get_count_latency, this) {
+ recorder_.hide();
+ }
+
+ /**
+ * @brief Constructor with prefix
+ * @param prefix Prefix for the metric name
+ * @param metric_name Base name for the metrics
+ */
+ BvarLatencyRecorderWithStatus(const std::string& prefix, const
std::string& metric_name)
+ : BvarLatencyRecorderWithStatus(prefix + "_" + metric_name) {}
+
+ /**
+ * @brief Record a latency value
+ * @param value Latency value to record (in microseconds)
+ */
+ void put(int64_t value) { recorder_ << value; }
+
+ /**
+ * @brief Stream operator for recording latency values
+ * @param value Latency value to record (in microseconds)
+ */
+ void operator<<(int64_t value) { recorder_ << value; }
+
+ int64_t max() const { return recorder_.max_latency(); }
+
+ int64_t avg() const { return recorder_.latency(); }
+
+ int64_t count() const { return recorder_.count(); }
+
+private:
+ bvar::LatencyRecorder recorder_; // The underlying latency
recorder
+ bvar::PassiveStatus<int64_t> max_status_; // Passive status for maximum
latency
+ bvar::PassiveStatus<int64_t> avg_status_; // Passive status for average
latency
+ bvar::PassiveStatus<int64_t> count_status_; // Passive status for count
latency
+
+ /**
+ * @brief Callback function to get maximum latency
+ * @param arg Pointer to the BvarLatencyRecorderWithStatus instance
+ * @return Maximum latency value, or 0 if negative
+ */
+ static int64_t get_max_latency(void* arg) {
+ auto* self = static_cast<BvarLatencyRecorderWithStatus*>(arg);
+ int64_t value = self->recorder_.max_latency();
+ return value >= 0 ? value : 0;
+ }
+
+ /**
+ * @brief Callback function to get average latency
+ * @param arg Pointer to the BvarLatencyRecorderWithStatus instance
+ * @return Average latency value, or 0 if negative
+ */
+ static int64_t get_avg_latency(void* arg) {
+ auto* self = static_cast<BvarLatencyRecorderWithStatus*>(arg);
+ int64_t value = self->recorder_.latency();
+ return value >= 0 ? value : 0;
+ }
+
+ /**
+ * @brief Callback function to get count latency
+ * @param arg Pointer to the BvarLatencyRecorderWithStatus instance
+ * @return Count latency value, or 0 if negative
+ */
+ static int64_t get_count_latency(void* arg) {
+ auto* self = static_cast<BvarLatencyRecorderWithStatus*>(arg);
+ int64_t value = self->recorder_.count();
+ return value >= 0 ? value : 0;
+ }
+};
+
+/**
+ * @class MBvarLatencyRecorderWithStatus
+ * @brief A multi-dimensional latency recorder with status metrics
+ *
+ * This class provides a way to record latency metrics across multiple
dimensions and
+ * automatically update status metrics (max and average) at regular intervals.
+ * It leverages bvar's MultiDimension capability to track metrics per
dimension combination.
+ *
+ * @tparam N Window size in seconds for the latency recorder (default: 60)
+ */
+template <int N = 60>
+class MBvarLatencyRecorderWithStatus {
+private:
+ /**
+ * @class ScheduledLatencyUpdater
+ * @brief A helper class to schedule deferred execution of tasks using
bthread timer.
+ *
+ * This class provides a way to execute a callback function after a
specified time interval.
+ * It takes care of safely managing the lifecycle of the timer and ensures
the callback
+ * is only executed if the timer and its arguments are still valid.
+ *
+ * @note This class requires bthread to be initialized before use. If
bthread is not
+ * initialized, timer creation will fail.
+ */
+ class ScheduledLatencyUpdater : public bvar::LatencyRecorder {
+ public:
+ /**
+ * @brief Constructor for ScheduledLatencyUpdater
+ *
+ * @param interval_s The time interval in seconds after which the
callback should be executed
+ * @param arg Optional argument to pass to the callback function
+ */
+ ScheduledLatencyUpdater(size_t interval_s, void* arg = nullptr)
+ : bvar::LatencyRecorder(interval_s), _interval_s(interval_s),
_arg(arg) {
+ hide();
+ }
+
+ /**
+ * @brief Destructor
+ *
+ * Stops the timer if it's still running to prevent any callbacks after
destruction
+ */
+ ~ScheduledLatencyUpdater() { stop(); }
+
+ /**
+ * @brief Start the timer
+ *
+ * Schedules the callback function to be executed after the specified
interval.
+ * Does nothing if the timer has already been started.
+ *
+ * @return true if the timer was successfully started, false otherwise
+ */
+ bool start() {
+ if (!_started.load()) {
+ {
+ std::lock_guard<bthread::Mutex> l(init_mutex_);
+ if (!_started.load()) {
+ if (!schedule()) {
+ return false;
+ }
+ _started.store(true);
+ }
+ return true;
+ }
+ }
+ return true;
+ }
+
+ /*** @brief Reschedule the timer
+ *
+ * Scheduling a one-time task.
+ * This is useful if you want to reset the timer interval.
+ */
+ bool schedule() {
+ if (bthread_timer_add(&_timer,
butil::seconds_from_now(_interval_s), update, this) !=
+ 0) {
+ LOG(WARNING) << "Failed to add bthread timer for
ScheduledLatencyUpdater";
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * @brief Background update function
+ *
+ * This function is called periodically by the timer to update the
status metrics
+ * with the current values from the latency recorders.
+ *
+ * @param arg Pointer to the mBvarLatencyRecorderWithStatus instance
+ */
+ static void update(void* arg) {
+ auto* latency_updater = static_cast<ScheduledLatencyUpdater*>(arg);
+ if (!latency_updater || !latency_updater->_started) {
+ LOG(WARNING) << "Invalid ScheduledLatencyUpdater in timer
callback";
+ return;
+ }
+
+ VLOG_DEBUG << "Timer triggered for ScheduledLatencyUpdater,
interval: "
+ << latency_updater->_interval_s << "s";
+
+ auto* parent =
static_cast<MBvarLatencyRecorderWithStatus*>(latency_updater->_arg);
+ if (!parent) {
+ LOG(WARNING) << "Invalid parent container in timer callback";
+ return;
+ }
+
+ std::list<std::string> current_dim_list;
+ {
+ std::lock_guard<bthread::Mutex> l(parent->recorder_mutex_);
+ for (const auto& it : parent->recorder_) {
+ if (it.second.get() == latency_updater) {
+ current_dim_list = it.first;
+ break;
+ }
+ }
+ }
+
+ if (current_dim_list.empty()) {
+ LOG(WARNING) << "Could not find dimension for
ScheduledLatencyUpdater";
+ return;
+ }
+
+ {
+ std::lock_guard<bthread::Mutex> l(parent->timer_mutex_);
+
+ bvar::Status<int64_t>* max_status =
parent->max_status_.get_stats(current_dim_list);
+ bvar::Status<int64_t>* avg_status =
parent->avg_status_.get_stats(current_dim_list);
+ bvar::Status<int64_t>* count_status =
+ parent->count_status_.get_stats(current_dim_list);
+
+ VLOG_DEBUG << "Updating latency recorder status for dimension,
"
+ << "max_latency: " << latency_updater->max_latency()
+ << ", avg_latency: " << latency_updater->latency();
+ TEST_SYNC_POINT("mBvarLatencyRecorderWithStatus::update");
+
+ if (max_status) {
+ max_status->set_value(latency_updater->max_latency());
+ }
+ if (avg_status) {
+ avg_status->set_value(latency_updater->latency());
+ }
+ if (count_status) {
+ count_status->set_value(latency_updater->count());
+ }
+ }
+
+ if (latency_updater->_started && !latency_updater->schedule()) {
+ LOG(WARNING) << "Failed to reschedule timer for
ScheduledLatencyUpdater";
+ latency_updater->_started = false;
+ }
+ }
+
+ /**
+ * @brief Stop the timer
+ *
+ * Cancels the timer if it's running and marks arguments as invalid to
prevent
+ * any pending callbacks from accessing potentially freed resources.
+ */
+ void stop() {
+ if (_started.load()) {
+ bthread_timer_del(_timer);
+ _started = false;
+ }
+ }
+
+ private:
+ int _interval_s; // Timer interval in seconds
+ void* _arg; // Argument to pass to the callback
+ bthread_timer_t _timer; // The bthread timer handle
+ std::atomic_bool _started {false}; // Whether the timer has been
started
+ bthread::Mutex init_mutex_; // Mutex for timer_map_
+ };
+
+public:
+ /**
+ * @brief Constructor
+ *
+ * @param metric_name Base name for the metrics
+ * @param dim_names List of dimension names
+ */
+ MBvarLatencyRecorderWithStatus(const std::string& metric_name,
+ const std::initializer_list<std::string>&
dim_names)
+ : _metric_name(metric_name),
+ max_status_(metric_name + "_max",
std::list<std::string>(dim_names)),
+ avg_status_(metric_name + "_avg",
std::list<std::string>(dim_names)),
+ count_status_(metric_name + "_count",
std::list<std::string>(dim_names)) {}
+
+ MBvarLatencyRecorderWithStatus(const std::string& prefix, const
std::string& metric_name,
+ const std::initializer_list<std::string>&
dim_names)
+ : MBvarLatencyRecorderWithStatus(prefix + "_" + metric_name,
dim_names) {}
+
+ /**
+ * @brief Record a latency value
+ *
+ * @param dim_values List of dimension values (must match the number of
dimensions)
+ * @param value The latency value to record
+ */
+ void put(const std::initializer_list<std::string>& dim_values, int64_t
value) {
+ std::list<std::string> dim_list(dim_values);
+ std::shared_ptr<ScheduledLatencyUpdater> latency = nullptr;
+ {
+ std::lock_guard<bthread::Mutex> l(recorder_mutex_);
+ auto it = recorder_.find(dim_list);
+ if (it == recorder_.end()) {
+ int inteval_s = N;
+
TEST_SYNC_POINT_CALLBACK("mBvarLatencyRecorderWithStatus::put", &inteval_s);
+ latency = std::make_shared<ScheduledLatencyUpdater>(inteval_s,
this);
+ recorder_[dim_list] = latency;
+ } else {
+ latency = it->second;
+ }
+ }
+
+ auto* latency_ptr = latency.get();
+ latency->start();
+ *latency_ptr << value;
+ }
+
+ int64_t get_max(const std::initializer_list<std::string>& dim_values) {
+ return
max_status_.get_stats(std::list<std::string>(dim_values))->get_value();
+ }
+
+ int64_t get_avg(const std::initializer_list<std::string>& dim_values) {
+ return
avg_status_.get_stats(std::list<std::string>(dim_values))->get_value();
+ }
+
+ int64_t get_count(const std::initializer_list<std::string>& dim_values) {
+ return
count_status_.get_stats(std::list<std::string>(dim_values))->get_value();
+ }
+
+private:
+ std::string _metric_name;
+ // dim_names -> recorder
+ std::map<std::list<std::string>, std::shared_ptr<ScheduledLatencyUpdater>>
recorder_;
+ bvar::MultiDimension<bvar::Status<int64_t>> max_status_;
+ bvar::MultiDimension<bvar::Status<int64_t>> avg_status_;
+ bvar::MultiDimension<bvar::Status<int64_t>> count_status_;
+ bthread::Mutex recorder_mutex_; // Mutex for recorder_
+ bthread::Mutex timer_mutex_; // Mutex for timer_map_
+};
using mBvarIntAdder = mBvarWrapper<bvar::Adder<int>>;
using mBvarInt64Adder = mBvarWrapper<bvar::Adder<int64_t>>;
@@ -256,6 +610,10 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_check_kv;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_schema_dict;
extern bvar::Adder<int64_t> g_bvar_update_delete_bitmap_fail_counter;
extern bvar::Adder<int64_t> g_bvar_get_delete_bitmap_fail_counter;
+extern BvarLatencyRecorderWithStatus<60>
g_bvar_ms_txn_commit_with_tablet_count;
+extern BvarLatencyRecorderWithStatus<60>
g_bvar_ms_txn_commit_with_partition_count;
+extern MBvarLatencyRecorderWithStatus<60>
g_bvar_instance_txn_commit_with_partition_count;
+extern MBvarLatencyRecorderWithStatus<60>
g_bvar_instance_txn_commit_with_tablet_count;
// recycler's bvars
extern BvarStatusWithTag<int64_t> g_bvar_recycler_recycle_index_earlest_ts;
diff --git a/cloud/src/meta-service/meta_service.h
b/cloud/src/meta-service/meta_service.h
index d29eb4488f2..8e1c2b0ed17 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -62,6 +62,11 @@ static void* run_bthread_work(void* arg) {
lock_id == SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID;
}
+[[maybe_unused]] void record_txn_commit_stats(doris::cloud::Transaction* txn,
+ const std::string& instance_id,
+ int64_t partition_count, int64_t
tablet_count,
+ int64_t txn_id);
+
class MetaServiceImpl : public cloud::MetaService {
public:
MetaServiceImpl(std::shared_ptr<TxnKv> txn_kv,
std::shared_ptr<ResourceManager> resource_mgr,
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index 4ed3c2a1c3d..3e16bbd106e 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -1087,6 +1087,8 @@ void commit_txn_immediately(
// {table/partition} -> version
std::unordered_map<std::string, uint64_t> new_versions;
+ // partition_id
+ std::unordered_set<int64_t> partition_ids;
std::vector<std::string> version_keys;
for (auto& [_, i] : tmp_rowsets_meta) {
int64_t tablet_id = i.tablet_id();
@@ -1098,6 +1100,7 @@ void commit_txn_immediately(
new_versions.insert({ver_key, 0});
version_keys.push_back(std::move(ver_key));
}
+ partition_ids.insert(partition_id);
}
std::vector<std::optional<std::string>> version_values;
err = txn->batch_get(&version_values, version_keys);
@@ -1156,6 +1159,7 @@ void commit_txn_immediately(
}
std::vector<std::pair<std::string, std::string>> rowsets;
+
std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id ->
stats
rowsets.reserve(tmp_rowsets_meta.size());
@@ -1644,6 +1648,8 @@ void commit_txn_eventually(
// <partition_version_key, version>
std::unordered_map<std::string, uint64_t> new_versions;
+ // partition_id
+ std::unordered_set<int64_t> partition_ids;
std::vector<std::string> version_keys;
for (auto& [_, i] : tmp_rowsets_meta) {
int64_t tablet_id = i.tablet_id();
@@ -1655,6 +1661,7 @@ void commit_txn_eventually(
new_versions.insert({ver_key, 0});
version_keys.push_back(std::move(ver_key));
}
+ partition_ids.insert(partition_id);
}
std::vector<std::optional<std::string>> version_values;
@@ -1716,6 +1723,9 @@ void commit_txn_eventually(
continue;
}
+ record_txn_commit_stats(txn.get(), instance_id, partition_ids.size(),
tablet_ids.size(),
+ txn_id);
+
std::string info_val;
const std::string info_key = txn_info_key({instance_id, db_id,
txn_id});
err = txn->get(info_key, &info_val);
@@ -2336,6 +2346,9 @@ void commit_txn_with_sub_txn(const CommitTxnRequest*
request, CommitTxnResponse*
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
std::vector<std::pair<std::string, std::string>> rowsets;
+ record_txn_commit_stats(txn.get(), instance_id, partition_ids.size(),
tablet_ids.size(),
+ txn_id);
+
std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id ->
stats
for (const auto& sub_txn_info : sub_txn_infos) {
auto sub_txn_id = sub_txn_info.sub_txn_id();
@@ -4092,4 +4105,17 @@ void
MetaServiceImpl::get_txn_id(::google::protobuf::RpcController* controller,
return;
}
+void record_txn_commit_stats(doris::cloud::Transaction* txn, const
std::string& instance_id,
+ int64_t partition_count, int64_t tablet_count,
int64_t txn_id) {
+ int64_t kv_count = txn->num_put_keys() + txn->num_del_keys() +
txn->num_get_keys();
+ int64_t kv_bytes = txn->get_bytes();
+ LOG(INFO) << "txn commit stats, instance_id: " << instance_id << ",
txn_id: " << txn_id
+ << ", kv_count: " << kv_count << ", kv_bytes: " << kv_bytes
+ << ", partition_count: " << partition_count << ", tablet_count:
" << tablet_count;
+ g_bvar_ms_txn_commit_with_partition_count << partition_count;
+ g_bvar_ms_txn_commit_with_tablet_count << tablet_count;
+ g_bvar_instance_txn_commit_with_partition_count.put({instance_id},
partition_count);
+ g_bvar_instance_txn_commit_with_tablet_count.put({instance_id},
tablet_count);
+}
+
} // namespace doris::cloud
diff --git a/cloud/test/CMakeLists.txt b/cloud/test/CMakeLists.txt
index 4208eb7019f..0a68b019721 100644
--- a/cloud/test/CMakeLists.txt
+++ b/cloud/test/CMakeLists.txt
@@ -67,6 +67,9 @@ add_executable(network_util_test network_util_test.cpp)
add_executable(txn_lazy_commit_test txn_lazy_commit_test.cpp)
+
+add_executable(bvars_test bvars_test.cpp)
+
message("Meta-service test dependencies: ${TEST_LINK_LIBS}")
#target_link_libraries(sync_point_test ${TEST_LINK_LIBS})
@@ -138,6 +141,10 @@ target_link_libraries(fdb_injection_test
${FDB_LINKER_FLAGS}
${TEST_LINK_LIBS})
+target_link_libraries(bvars_test
+ ${FDB_LINKER_FLAGS}
+ ${TEST_LINK_LIBS})
+
install(FILES
${BASE_DIR}/script/run_all_tests.sh
${BASE_DIR}/conf/fdb.cluster
diff --git a/cloud/test/bvars_test.cpp b/cloud/test/bvars_test.cpp
new file mode 100644
index 00000000000..8f895354f18
--- /dev/null
+++ b/cloud/test/bvars_test.cpp
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "common/bvars.h"
+
+#include <brpc/server.h>
+#include <bthread/bthread.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include <atomic>
+#include <chrono>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include "common/logging.h"
+#include "meta-service/meta_service.h"
+#include "meta-store/mem_txn_kv.h"
+
+namespace doris::cloud {
+
+int main(int argc, char** argv) {
+ if (!cloud::init_glog("bvars_test")) {
+ std::cerr << "failed to init glog" << std::endl;
+ return -1;
+ }
+
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
+
+class BvarsTest : public ::testing::Test {
+public:
+ void SetUp() override {
+ if (server.Start("0.0.0.0:0", &options) == -1) {
+ perror("Start brpc server");
+ }
+ }
+ void TearDown() override {
+ server.Stop(0);
+ server.Join();
+ }
+ brpc::ServerOptions options;
+ brpc::Server server;
+};
+TEST(BvarsTest, MultiThreadRecordMetrics) {
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ std::vector<std::thread> threads;
+ std::atomic_long update_count = 0;
+ MBvarLatencyRecorderWithStatus<60>
mbvarlr_partition_test("partition_test", {"instance_id"});
+ MBvarLatencyRecorderWithStatus<60> mbvarlr_tablet_test("tablet_test",
{"instance_id"});
+
+ int interval_s = 60;
+ auto* sp = SyncPoint::get_instance();
+ sp->set_call_back("mBvarLatencyRecorderWithStatus::put",
[&interval_s](auto&& args) {
+ auto* vault = try_any_cast<int*>(args[0]);
+ *vault = interval_s;
+ });
+ sp->set_call_back("mBvarLatencyRecorderWithStatus::update",
+ [&update_count](auto&& args) { update_count++; });
+ sp->enable_processing();
+
+ for (int i = 0; i < 20; ++i) {
+ threads.emplace_back([&]() {
+ for (size_t j = 0; j < 1000; j++) {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(TxnErrorCode::TXN_OK, txn_kv->create_txn(&txn));
+
+ std::string instance_id = "instance_" + std::to_string(j %
100);
+ int64_t partition_count = 10 + j;
+ int64_t tablet_count = 50 + j;
+
+ mbvarlr_partition_test.put({instance_id}, partition_count);
+ mbvarlr_tablet_test.put({instance_id}, tablet_count);
+
+ std::this_thread::sleep_for(std::chrono::microseconds(1000));
+ }
+ });
+ }
+
+ for (int i = 0; i < 20; ++i) {
+ threads.emplace_back([&]() {
+ for (size_t j = 0; j < 1000; j++) {
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(TxnErrorCode::TXN_OK, txn_kv->create_txn(&txn));
+
+ std::string instance_id = "instance_" + std::to_string(j %
100);
+
+ mbvarlr_partition_test.get_count({instance_id});
+ mbvarlr_partition_test.get_avg({instance_id});
+ mbvarlr_partition_test.get_max({instance_id});
+ mbvarlr_tablet_test.get_count({instance_id});
+ mbvarlr_tablet_test.get_avg({instance_id});
+ mbvarlr_tablet_test.get_max({instance_id});
+
+ std::this_thread::sleep_for(std::chrono::microseconds(1000));
+ }
+ });
+ }
+
+ for (auto& t : threads) {
+ t.join();
+ }
+
+ sleep(interval_s * 3);
+
+ ASSERT_GT(update_count.load(), 200);
+}
+
+} // namespace doris::cloud
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]