This is an automated email from the ASF dual-hosted git repository.
guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 1d3bded9 Support higher performance bvar with babylon counter (#3116)
1d3bded9 is described below
commit 1d3bded92a8de6db7d76e53901afec205501952d
Author: Bright Chen <[email protected]>
AuthorDate: Sun Oct 26 15:22:43 2025 +0800
Support higher performance bvar with babylon counter (#3116)
* Support higher performance bvar with babylon counter
* Update documents
---
.bazelrc | 3 +-
.github/pull_request_template.md | 2 +-
.github/workflows/ci-linux.yml | 9 ++
.github/workflows/ci-macos.yml | 1 -
BUILD.bazel | 10 +-
MODULE.bazel | 1 +
bazel/config/BUILD.bazel | 6 +
docs/cn/bvar_c++.md | 6 +
docs/en/bvar_c++.md | 6 +
src/butil/macros.h | 4 +-
src/butil/scoped_lock.h | 2 +-
src/bvar/detail/percentile.cpp | 37 +++++-
src/bvar/detail/percentile.h | 135 ++++++++++++++++---
src/bvar/latency_recorder.cpp | 2 +-
src/bvar/latency_recorder.h | 4 +-
src/bvar/recorder.h | 124 +++++++++++++++--
src/bvar/reducer.h | 273 +++++++++++++++++++++++++++++++++-----
src/bvar/variable.h | 10 ++
test/bvar_lock_timer_unittest.cpp | 9 ++
test/bvar_percentile_unittest.cpp | 4 +-
test/bvar_recorder_unittest.cpp | 2 +
test/bvar_reducer_unittest.cpp | 2 +-
22 files changed, 580 insertions(+), 72 deletions(-)
diff --git a/.bazelrc b/.bazelrc
index 3ffec2dc..2ee10dda 100644
--- a/.bazelrc
+++ b/.bazelrc
@@ -20,6 +20,7 @@ common --registry=https://bcr.bazel.build
common --registry=https://baidu.github.io/babylon/registry
build --cxxopt="-std=c++17"
+build --copt="-fno-omit-frame-pointer"
# Use gnu17 for asm keyword.
build --conlyopt="-std=gnu17"
@@ -33,8 +34,8 @@ build --features=per_object_debug_info
build --define absl=1
# For brpc.
-build --define=BRPC_WITH_GLOG=true
test --define=BRPC_BUILD_FOR_UNITTEST=true
+test --test_output=streamed
# Pass PATH, CC, CXX and LLVM_CONFIG variables from the environment.
build --action_env=CC
diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md
index 6cd036b6..aa227cd6 100644
--- a/.github/pull_request_template.md
+++ b/.github/pull_request_template.md
@@ -1,6 +1,6 @@
### What problem does this PR solve?
-Issue Number:
+Issue Number: resolve
Problem Summary:
diff --git a/.github/workflows/ci-linux.yml b/.github/workflows/ci-linux.yml
index 5d749a32..f2d6d692 100644
--- a/.github/workflows/ci-linux.yml
+++ b/.github/workflows/ci-linux.yml
@@ -225,3 +225,12 @@ jobs:
run: |
cd test
sh ./run_tests.sh
+
+ bazel-bvar-unittest:
+ runs-on: ubuntu-22.04
+ steps:
+ - uses: actions/checkout@v2
+ - run: bazel test --verbose_failures //test:bvar_test
+ - run: bazel test --verbose_failures --define with_babylon_counter=true
//test:bvar_test
+ - run: bazel test --verbose_failures --action_env=CC=clang //test:bvar_test
+ - run: bazel test --verbose_failures --action_env=CC=clang --define
with_babylon_counter=true //test:bvar_test
diff --git a/.github/workflows/ci-macos.yml b/.github/workflows/ci-macos.yml
index 92b70b5b..61d45ac8 100644
--- a/.github/workflows/ci-macos.yml
+++ b/.github/workflows/ci-macos.yml
@@ -61,7 +61,6 @@ jobs:
compile-with-bazel:
runs-on: macos-latest # https://github.com/actions/runner-images
-
steps:
- uses: actions/checkout@v2
- run: bazel build --verbose_failures -- //:brpc -//example/...
diff --git a/BUILD.bazel b/BUILD.bazel
index a3e7b29f..138e416b 100644
--- a/BUILD.bazel
+++ b/BUILD.bazel
@@ -21,6 +21,7 @@ licenses(["notice"]) # Apache v2
exports_files(["LICENSE"])
COPTS = [
+ "-fno-omit-frame-pointer",
"-DBTHREAD_USE_FAST_PTHREAD_MUTEX",
"-D__const__=__unused__",
"-D_GNU_SOURCE",
@@ -377,6 +378,10 @@ cc_library(
"src/bvar/utils/*.h",
"src/bvar/detail/*.h",
]),
+ defines = [] + select({
+ "//bazel/config:with_babylon_counter": ["WITH_BABYLON_COUNTER=1"],
+ "//conditions:default": [],
+ }),
copts = COPTS + select({
"//bazel/config:brpc_build_for_unittest": [
"-DBVAR_NOT_LINK_DEFAULT_VARIABLES",
@@ -391,7 +396,10 @@ cc_library(
visibility = ["//visibility:public"],
deps = [
":butil",
- ],
+ ] + select({
+ "//bazel/config:with_babylon_counter":
["@babylon//:concurrent_counter"],
+ "//conditions:default": [],
+ }),
)
cc_library(
diff --git a/MODULE.bazel b/MODULE.bazel
index fac96037..43223fb1 100644
--- a/MODULE.bazel
+++ b/MODULE.bazel
@@ -17,6 +17,7 @@ bazel_dep(name = 'rules_cc', version = '0.0.1')
bazel_dep(name = 'rules_proto', version = '4.0.0')
bazel_dep(name = 'zlib', version = '1.3.1.bcr.5', repo_name =
'com_github_madler_zlib')
bazel_dep(name = 'libunwind', version = '1.8.1', repo_name =
'com_github_libunwind_libunwind')
+bazel_dep(name = 'babylon', version = '1.4.4')
# --registry=https://baidu.github.io/babylon/registry
bazel_dep(name = 'leveldb', version = '1.23', repo_name =
'com_github_google_leveldb')
diff --git a/bazel/config/BUILD.bazel b/bazel/config/BUILD.bazel
index 594dc513..d08ea2ec 100644
--- a/bazel/config/BUILD.bazel
+++ b/bazel/config/BUILD.bazel
@@ -142,4 +142,10 @@ config_setting(
name = "brpc_with_no_pthread_mutex_hook",
define_values = {"BRPC_WITH_NO_PTHREAD_MUTEX_HOOK": "true"},
visibility = ["//visibility:public"],
+)
+
+config_setting(
+ name = "with_babylon_counter",
+ define_values = {"with_babylon_counter": "true"},
+ visibility = ["//visibility:public"],
)
\ No newline at end of file
diff --git a/docs/cn/bvar_c++.md b/docs/cn/bvar_c++.md
index 478335fe..ddc0de1b 100644
--- a/docs/cn/bvar_c++.md
+++ b/docs/cn/bvar_c++.md
@@ -650,3 +650,9 @@ static bvar::GFlag
s_gflag_my_flag_that_matters("my_flag_that_matters");
// Expose the gflag as a bvar named "foo_bar_my_flag_that_matters".
static bvar::GFlag s_gflag_my_flag_that_matters_with_prefix("foo_bar",
"my_flag_that_matters");
```
+# babylon counter bvar
+
+原理和性能见[babylon介绍](https://github.com/baidu/babylon/tree/main/example/use-counter-with-bvar)。
+
+目前只支持bazel编译方式:`--define with_babylon_counter=true`,babylon版本要求:>=
1.4.4。打开开关后,即可使用基于babylon counter实现的更高性能的bvar,无需修改代码。
+
diff --git a/docs/en/bvar_c++.md b/docs/en/bvar_c++.md
index 73fac6c4..160cdfcc 100644
--- a/docs/en/bvar_c++.md
+++ b/docs/en/bvar_c++.md
@@ -490,3 +490,9 @@ static bvar::GFlag
s_gflag_my_flag_that_matters("my_flag_that_matters");
// Expose the gflag as a bvar named "foo_bar_my_flag_that_matters".
static bvar::GFlag s_gflag_my_flag_that_matters_with_prefix("foo_bar",
"my_flag_that_matters");
```
+
+# babylon counter bvar
+
+For details on the principles and performance, see [Use concurrent counter
optimize
bvar](https://github.com/baidu/babylon/tree/main/example/use-counter-with-bvar).
+
+Currently, this feature is only supported by the Bazel compilation method:
`--define with_babylon_counter=true`. The required Babylon version is 1.4.4 or
higher. Once enabled, you can use the higher-performance bvar implementation
based on the babylon counter without modifying your code.
\ No newline at end of file
diff --git a/src/butil/macros.h b/src/butil/macros.h
index 63bb64de..d88a82d0 100644
--- a/src/butil/macros.h
+++ b/src/butil/macros.h
@@ -44,11 +44,11 @@
// Declarations for a class to be unassignable.
#define DISALLOW_ASSIGN(TypeName) \
- BUTIL_DELETE_FUNCTION(void operator=(const TypeName&))
+ BUTIL_DELETE_FUNCTION(TypeName& operator=(const TypeName&))
// Declarations for a class to be move-unassignable.
#define DISALLOW_MOVE_ASSIGN(TypeName) \
- BUTIL_DELETE_FUNCTION(void operator=(TypeName&&))
+ BUTIL_DELETE_FUNCTION(TypeName& operator=(TypeName&&))
// A macro to disallow the copy constructor and operator= functions.
#define DISALLOW_COPY_AND_ASSIGN(TypeName) \
diff --git a/src/butil/scoped_lock.h b/src/butil/scoped_lock.h
index 45f52301..1111daab 100644
--- a/src/butil/scoped_lock.h
+++ b/src/butil/scoped_lock.h
@@ -378,7 +378,7 @@ private:
namespace butil {
-// Lock both lck1 and lck2 without the dead lock issue
+// Lock both lck1 and lck2 without the deadlock issue.
template <typename Mutex1, typename Mutex2>
void double_lock(std::unique_lock<Mutex1> &lck1, std::unique_lock<Mutex2>
&lck2) {
DCHECK(!lck1.owns_lock());
diff --git a/src/bvar/detail/percentile.cpp b/src/bvar/detail/percentile.cpp
index 37181cc3..99de328c 100644
--- a/src/bvar/detail/percentile.cpp
+++ b/src/bvar/detail/percentile.cpp
@@ -22,7 +22,7 @@
namespace bvar {
namespace detail {
-
+#if !WITH_BABYLON_COUNTER
inline uint32_t ones32(uint32_t x) {
/* 32-bit recursive reduction using SWAR...
* but first step is mapping 2-bit values
@@ -127,6 +127,41 @@ Percentile &Percentile::operator<<(int64_t latency) {
agent->merge_global(AddLatency(latency), _combiner);
return *this;
}
+#else
+Percentile::value_type Percentile::reset() {
+ constexpr static size_t SAMPLE_SIZE = value_type::SAMPLE_SIZE;
+ value_type result;
+ _concurrent_sampler.for_each([&](
+ size_t index, const babylon::ConcurrentSampler::SampleBucket& bucket) {
+ result.merge(bucket, index);
+ auto capacity = _concurrent_sampler.bucket_capacity(index);
+ auto num_added = bucket.record_num.load(::std::memory_order_relaxed);
+ if (capacity < SAMPLE_SIZE && num_added > capacity) {
+ capacity = std::min<size_t>(SAMPLE_SIZE, num_added * 1.5);
+ _concurrent_sampler.set_bucket_capacity(index, capacity);
+ }
+ });
+ _concurrent_sampler.reset();
+ return result;
+}
+Percentile& Percentile::operator<<(int64_t value) {
+ if (BAIDU_UNLIKELY(value < 0)) {
+ // we don't check overflow(of uint32) in percentile because the
+ // overflowed value which is included in last range does not affect
+ // overall distribution of other values too much.
+ if (!_debug_name.empty()) {
+ LOG_EVERY_SECOND(WARNING) << "Input=" << value << " to `" <<
_debug_name
+ << "' is negative, drop";
+ } else {
+ LOG_EVERY_SECOND(WARNING) << "Input=" << value << " to Percentile("
+ << (void*)this << ") is negative, drop";
+ }
+ } else {
+ _concurrent_sampler << value;
+ }
+ return *this;
+}
+#endif // WITH_BABYLON_COUNTER
} // namespace detail
} // namespace bvar
diff --git a/src/bvar/detail/percentile.h b/src/bvar/detail/percentile.h
index 186103b4..bac729f1 100644
--- a/src/bvar/detail/percentile.h
+++ b/src/bvar/detail/percentile.h
@@ -32,6 +32,9 @@
#include "bvar/detail/combiner.h" // AgentCombiner
#include "bvar/detail/sampler.h" // ReducerSampler
#include "butil/fast_rand.h"
+#if WITH_BABYLON_COUNTER
+#include "babylon/concurrent/counter.h"
+#endif // WITH_BABYLON_COUNTER
namespace bvar {
namespace detail {
@@ -139,6 +142,43 @@ public:
_num_added += rhs._num_added;
}
+#if WITH_BABYLON_COUNTER
+ size_t merge(const babylon::ConcurrentSampler::SampleBucket& bucket) {
+ auto num_added = bucket.record_num.load(std::memory_order_acquire);
+ if (num_added == 0) {
+ return 0;
+ }
+ auto num_samples = std::min(num_added,
static_cast<uint32_t>(bucket.capacity));
+ // If there is space, deposit directly.
+ if (_num_samples + num_samples <= SAMPLE_SIZE) {
+ __builtin_memcpy(_samples + _num_samples, bucket.data,
+ sizeof(uint32_t) * num_samples);
+ _num_samples += num_samples;
+ } else {
+ // Sample probability weighting.
+ float ratio = static_cast<float>(num_samples) / num_added;
+ // Try to deposit directly first.
+ if (_num_samples < SAMPLE_SIZE) {
+ auto copy_size = SAMPLE_SIZE - _num_samples;
+ num_samples -= copy_size;
+ __builtin_memcpy(_samples + _num_samples,
+ bucket.data + num_samples, sizeof(uint32_t) *
copy_size);
+ }
+ // The remaining samples are stored according to probability.
+ for (size_t i = 0; i < num_samples; ++i) {
+ auto index = butil::fast_rand() %
+ static_cast<uint64_t>((_num_added + i) * ratio + 1);
+ if (index < SAMPLE_SIZE) {
+ _samples[index] = bucket.data[i];
+ }
+ }
+ _num_samples = SAMPLE_SIZE;
+ }
+ _num_added += num_added;
+ return num_added;
+ }
+#endif // WITH_BABYLON_COUNTER
+
// Randomly pick n samples from mutable_rhs to |this|
template <size_t size2>
void merge_with_expectation(PercentileInterval<size2>& mutable_rhs, size_t
n) {
@@ -239,7 +279,9 @@ class AddLatency;
template <size_t SAMPLE_SIZE_IN>
class PercentileSamples {
public:
+#if !WITH_BABYLON_COUNTER
friend class AddLatency;
+#endif // WITH_BABYLON_COUNTER
static const size_t SAMPLE_SIZE = SAMPLE_SIZE_IN;
@@ -324,6 +366,12 @@ friend class AddLatency;
}
}
+#if WITH_BABYLON_COUNTER
+ void merge(const babylon::ConcurrentSampler::SampleBucket& bucket, size_t
index) {
+ _num_added += get_interval_at(index).merge(bucket);
+ }
+#endif // WITH_BABYLON_COUNTER
+
// Combine multiple into a single PercentileSamples
template <typename Iterator>
void combine_of(const Iterator& begin, const Iterator& end) {
@@ -443,31 +491,36 @@ std::ostream &operator<<(std::ostream &os, const
PercentileSamples<size> &p) {
typedef PercentileSamples<254> GlobalPercentileSamples;
typedef PercentileSamples<30> ThreadLocalPercentileSamples;
+namespace detail {
+struct AddPercentileSamples {
+ template <size_t size1, size_t size2>
+ void operator()(PercentileSamples<size1> &b1,
+ const PercentileSamples<size2> &b2) const {
+ b1.merge(b2);
+ }
+};
+} // namespace detail
+
// A specialized reducer for finding the percentile of latencies.
// NOTE: DON'T use it directly, use LatencyRecorder instead.
+#if !WITH_BABYLON_COUNTER
class Percentile {
public:
- struct AddPercentileSamples {
- template <size_t size1, size_t size2>
- void operator()(PercentileSamples<size1> &b1,
- const PercentileSamples<size2> &b2) const {
- b1.merge(b2);
- }
- };
-
- typedef GlobalPercentileSamples value_type;
- typedef ReducerSampler<Percentile,
- GlobalPercentileSamples,
- AddPercentileSamples, VoidOp> sampler_type;
+ typedef GlobalPercentileSamples value_type;
+ typedef ReducerSampler<Percentile, GlobalPercentileSamples,
+ detail::AddPercentileSamples, VoidOp>sampler_type;
typedef AgentCombiner <GlobalPercentileSamples,
ThreadLocalPercentileSamples,
- AddPercentileSamples> combiner_type;
- typedef typename combiner_type::self_shared_type
shared_combiner_type;
- typedef combiner_type::Agent agent_type;
+ detail::AddPercentileSamples> combiner_type;
+ typedef combiner_type::self_shared_type shared_combiner_type;
+ typedef combiner_type::Agent agent_type;
+
Percentile();
~Percentile();
- AddPercentileSamples op() const { return AddPercentileSamples(); }
+ detail::AddPercentileSamples op() const {
+ return detail::AddPercentileSamples();
+ }
VoidOp inv_op() const { return VoidOp(); }
// The sampler for windows over percentile.
@@ -499,6 +552,56 @@ private:
sampler_type* _sampler;
std::string _debug_name;
};
+#else
+class Percentile {
+public:
+ typedef GlobalPercentileSamples value_type;
+ typedef detail::AddPercentileSamples AddPercentileSamples;
+ typedef AddPercentileSamples Op;
+ typedef VoidOp InvOp;
+ typedef ReducerSampler<Percentile, value_type, Op, InvOp> sampler_type;
+
+ Percentile() = default;
+ DISALLOW_COPY_AND_MOVE(Percentile);
+ ~Percentile() noexcept {
+ if (NULL != _sampler) {
+ _sampler->destroy();
+ }
+ }
+
+ Op op() const { return Op(); }
+ InvOp inv_op() const { return InvOp(); }
+
+ sampler_type* get_sampler() {
+ if (NULL == _sampler) {
+ _sampler = new sampler_type(this);
+ _sampler->schedule();
+ }
+ return _sampler;
+ }
+
+ value_type reset();
+
+ value_type get_value() const {
+ LOG_EVERY_SECOND(ERROR) << "Percentile should never call this
get_value()";
+ return value_type();
+ }
+
+ Percentile& operator<<(int64_t value);
+
+ bool valid() const { return true; }
+
+ // This name is useful for warning negative latencies in operator<<
+ void set_debug_name(const butil::StringPiece& name) {
+ _debug_name.assign(name.data(), name.size());
+ }
+
+private:
+ babylon::ConcurrentSampler _concurrent_sampler;
+ sampler_type* _sampler{NULL};
+ std::string _debug_name;
+};
+#endif // WITH_BABYLON_COUNTER
} // namespace detail
} // namespace bvar
diff --git a/src/bvar/latency_recorder.cpp b/src/bvar/latency_recorder.cpp
index f70894ae..a951376d 100644
--- a/src/bvar/latency_recorder.cpp
+++ b/src/bvar/latency_recorder.cpp
@@ -99,7 +99,7 @@ static int64_t double_to_random_int(double dval) {
}
static int64_t get_window_recorder_qps(void* arg) {
- detail::Sample<Stat> s;
+ Sample<Stat> s;
static_cast<RecorderWindow*>(arg)->get_span(&s);
// Use floating point to avoid overflow.
if (s.time_us <= 0) {
diff --git a/src/bvar/latency_recorder.h b/src/bvar/latency_recorder.h
index 5e702992..874b1178 100644
--- a/src/bvar/latency_recorder.h
+++ b/src/bvar/latency_recorder.h
@@ -38,7 +38,7 @@ typedef Window<Percentile, SERIES_IN_SECOND> PercentileWindow;
class CDF : public Variable {
public:
explicit CDF(PercentileWindow* w);
- ~CDF();
+ ~CDF() override;
void describe(std::ostream& os, bool quote_string) const override;
int describe_series(std::ostream& os, const SeriesOptions& options) const
override;
private:
@@ -77,7 +77,7 @@ class LatencyRecorder : public detail::LatencyRecorderBase {
public:
LatencyRecorder() : Base(-1) {}
explicit LatencyRecorder(time_t window_size) : Base(window_size) {}
- explicit LatencyRecorder(const butil::StringPiece& prefix) : Base(-1) {
+ LatencyRecorder(const butil::StringPiece& prefix) : Base(-1) {
expose(prefix);
}
LatencyRecorder(const butil::StringPiece& prefix,
diff --git a/src/bvar/recorder.h b/src/bvar/recorder.h
index 99bdfa75..b28b6372 100644
--- a/src/bvar/recorder.h
+++ b/src/bvar/recorder.h
@@ -27,6 +27,9 @@
#include "bvar/variable.h"
#include "bvar/window.h"
#include "bvar/detail/sampler.h"
+#if WITH_BABYLON_COUNTER
+#include "babylon/concurrent/counter.h"
+#endif // WITH_BABYLON_COUNTER
namespace bvar {
@@ -76,11 +79,22 @@ inline std::ostream& operator<<(std::ostream& os, const
Stat& s) {
}
}
+namespace detail {
+struct AddStat {
+ void operator()(Stat& s1, const Stat& s2) const { s1 += s2; }
+};
+
+struct MinusStat {
+ void operator()(Stat& s1, const Stat& s2) const { s1 -= s2; }
+};
+} // namespace detail
+
// For calculating average of numbers.
// Example:
// IntRecorder latency;
// latency << 1 << 3 << 5;
// CHECK_EQ(3, latency.average());
+#if !WITH_BABYLON_COUNTER
class IntRecorder : public Variable {
public:
// Compressing format:
@@ -92,16 +106,10 @@ public:
BAIDU_CASSERT(SUM_BIT_WIDTH > 32 && SUM_BIT_WIDTH < 64,
SUM_BIT_WIDTH_must_be_between_33_and_63);
- struct AddStat {
- void operator()(Stat& s1, const Stat& s2) const { s1 += s2; }
- };
- struct MinusStat {
- void operator()(Stat& s1, const Stat& s2) const { s1 -= s2; }
- };
-
typedef Stat value_type;
typedef detail::ReducerSampler<IntRecorder, Stat,
- AddStat, MinusStat> sampler_type;
+ detail::AddStat,
+ detail::MinusStat> sampler_type;
typedef Stat SampleSet;
@@ -118,7 +126,7 @@ public:
IntRecorder() : _combiner(std::make_shared<combiner_type>()),
_sampler(NULL) {}
- explicit IntRecorder(const butil::StringPiece& name) : IntRecorder() {
+ IntRecorder(const butil::StringPiece& name) : IntRecorder() {
expose(name);
}
@@ -154,8 +162,8 @@ public:
return _combiner->reset_all_agents();
}
- AddStat op() const { return AddStat(); }
- MinusStat inv_op() const { return MinusStat(); }
+ detail::AddStat op() const { return detail::AddStat(); }
+ detail::MinusStat inv_op() const { return detail::MinusStat(); }
void describe(std::ostream& os, bool /*quote_string*/) const override {
os << get_value();
@@ -286,6 +294,100 @@ inline IntRecorder& IntRecorder::operator<<(int64_t
sample) {
n, _compress(num + 1, sum + complement)));
return *this;
}
+#else // WITH_BABYLON_COUNTER
+class IntRecorder : public Variable {
+public:
+ typedef Stat value_type;
+ typedef detail::AddStat Op;
+ typedef detail::MinusStat InvOp;
+ typedef detail::ReducerSampler<IntRecorder, value_type, Op, InvOp>
sampler_type;
+
+ COMMON_VARIABLE_CONSTRUCTOR(IntRecorder);
+
+ DISALLOW_COPY_AND_MOVE(IntRecorder);
+
+ ~IntRecorder() override {
+ hide();
+ if (NULL != _sampler) {
+ _sampler->destroy();
+ }
+ }
+
+ // Note: The input type is acutally int. Use int64_t to check overflow.
+ IntRecorder& operator<<(int64_t value) {
+ if (BAIDU_UNLIKELY((int64_t)(int)value != value)) {
+ const char* reason = NULL;
+ if (value > std::numeric_limits<int>::max()) {
+ reason = "overflows";
+ value = std::numeric_limits<int>::max();
+ } else {
+ reason = "underflows";
+ value = std::numeric_limits<int>::min();
+ }
+ // Truncate to be max or min of int. We're using 44 bits to store
the
+ // sum thus following aggregations are not likely to be
over/underflow.
+ if (!name().empty()) {
+ LOG(WARNING) << "Input=" << value << " to `" << name()
+ << "\' " << reason;
+ } else if (!_debug_name.empty()) {
+ LOG(WARNING) << "Input=" << value << " to `" << _debug_name
+ << "\' " << reason;
+ } else {
+ LOG(WARNING) << "Input=" << value << " to IntRecorder("
+ << (void*)this << ") " << reason;
+ }
+ }
+
+ _summer << value;
+ return *this;
+ }
+
+ int64_t average() const {
+ return get_value().get_average_int();
+ }
+
+ double average(double) const {
+ return get_value().get_average_double();
+ }
+
+ value_type get_value() const {
+ auto summary = _summer.value();
+ return value_type{summary.sum, static_cast<ssize_t>(summary.num)};
+ }
+
+ value_type reset() {
+ LOG_EVERY_SECOND(ERROR) << "IntRecorder with babylon counter should
never call reset()";
+ return get_value();
+ }
+
+ Op op() const { return Op(); }
+ InvOp inv_op() const { return InvOp(); }
+
+ void describe(::std::ostream& os, bool) const override {
+ os << get_value();
+ }
+
+ bool valid() const { return true; }
+
+ sampler_type* get_sampler() {
+ if (NULL == _sampler) {
+ _sampler = new sampler_type(this);
+ _sampler->schedule();
+ }
+ return _sampler;
+ }
+
+ // This name is useful for printing overflow log in operator<< since
+ // IntRecorder is often used as the source of data and not exposed.
+ void set_debug_name(const butil::StringPiece& name) {
+ _debug_name.assign(name.data(), name.size());
+ }
+private:
+ babylon::ConcurrentSummer _summer;
+ sampler_type* _sampler{NULL};
+ std::string _debug_name;
+};
+#endif // WITH_BABYLON_COUNTER
} // namespace bvar
diff --git a/src/bvar/reducer.h b/src/bvar/reducer.h
index ccf78054..543e77c8 100644
--- a/src/bvar/reducer.h
+++ b/src/bvar/reducer.h
@@ -29,9 +29,133 @@
#include "bvar/detail/sampler.h" // ReducerSampler
#include "bvar/detail/series.h"
#include "bvar/window.h"
+#if WITH_BABYLON_COUNTER
+#include "babylon/concurrent/counter.h"
+#endif // WITH_BABYLON_COUNTER
namespace bvar {
+namespace detail {
+template<typename O, typename T, typename Op>
+class SeriesSamplerImpl : public Sampler {
+public:
+ SeriesSamplerImpl(O* owner, const Op& op)
+ : _owner(owner), _series(op) {}
+ void take_sample() override { _series.append(_owner->get_value()); }
+ void describe(std::ostream& os) { _series.describe(os, NULL); }
+
+private:
+ O* _owner;
+ Series<T, Op> _series;
+};
+
+#if WITH_BABYLON_COUNTER
+template<typename T, typename Counter, typename Op, typename InvOp>
+class BabylonVariable: public Variable {
+public:
+ typedef ReducerSampler<BabylonVariable, T, Op, InvOp> sampler_type;
+ typedef SeriesSamplerImpl<BabylonVariable, T, Op> series_sampler_type;
+
+ BabylonVariable() = default;
+
+ template<typename U = T, typename std::enable_if<
+ !std::is_constructible<Counter, U>::value, bool>::type = false>
+ BabylonVariable(U) {}
+ // For Maxer.
+ template<typename U = T, typename std::enable_if<
+ std::is_constructible<Counter, U>::value, bool>::type = false>
+ BabylonVariable(U default_value) : _counter(default_value) {}
+
+ DISALLOW_COPY_AND_MOVE(BabylonVariable);
+
+ ~BabylonVariable() override {
+ hide();
+ if (NULL != _sampler) {
+ _sampler->destroy();
+ }
+ if (NULL != _series_sampler) {
+ _series_sampler->destroy();
+ }
+ }
+
+ BabylonVariable& operator<<(T value) {
+ _counter << value;
+ return *this;
+ }
+
+ sampler_type* get_sampler() {
+ if (NULL == _sampler) {
+ _sampler = new sampler_type(this);
+ _sampler->schedule();
+ }
+ return _sampler;
+ }
+
+ T get_value() const {
+ return _counter.value();
+ }
+
+ T reset() {
+ if (BAIDU_UNLIKELY((!butil::is_same<VoidOp, InvOp>::value))) {
+ CHECK(false) << "You should not call Reducer<" <<
butil::class_name_str<T>()
+ << ", " << butil::class_name_str<Op>() <<
">::get_value() when a"
+ << " Window<> is used because the operator does not
have inverse.";
+ return get_value();
+ }
+
+ T result = _counter.value();
+ _counter.reset();
+ return result;
+ }
+
+ bool valid() const { return true; }
+
+ const Op& op() const { return _op; }
+ const InvOp& inv_op() const { return _inv_op;}
+
+ void describe(std::ostream& os, bool quote_string) const override {
+ if (butil::is_same<T, std::string>::value && quote_string) {
+ os << '"' << get_value() << '"';
+ } else {
+ os << get_value();
+ }
+ }
+
+ int describe_series(std::ostream& os, const SeriesOptions& options) const
override {
+ if (NULL == _series_sampler) {
+ return 1;
+ }
+ if (!options.test_only) {
+ _series_sampler->describe(os);
+ }
+ return 0;
+ }
+
+protected:
+ int expose_impl(const butil::StringPiece& prefix,
+ const butil::StringPiece& name,
+ DisplayFilter display_filter) override {
+ const int rc = Variable::expose_impl(prefix, name, display_filter);
+ if (rc == 0 && NULL == _series_sampler &&
+ !butil::is_same<InvOp, VoidOp>::value &&
+ !butil::is_same<T, std::string>::value &&
+ FLAGS_save_series) {
+ _series_sampler = new series_sampler_type(this, _op);
+ _series_sampler->schedule();
+ }
+ return rc;
+ }
+
+private:
+ Counter _counter;
+ sampler_type* _sampler{NULL};
+ series_sampler_type* _series_sampler{NULL};
+ Op _op;
+ InvOp _inv_op;
+};
+#endif // WITH_BABYLON_COUNTER
+} // namespace detail
+
// Reduce multiple values into one with `Op': e1 Op e2 Op e3 ...
// `Op' shall satisfy:
// - associative: a Op (b Op c) == (a Op b) Op c
@@ -68,22 +192,12 @@ namespace bvar {
template <typename T, typename Op, typename InvOp = detail::VoidOp>
class Reducer : public Variable {
public:
- typedef typename detail::AgentCombiner<T, T, Op> combiner_type;
+ typedef detail::AgentCombiner<T, T, Op> combiner_type;
typedef typename combiner_type::self_shared_type shared_combiner_type;
typedef typename combiner_type::Agent agent_type;
typedef detail::ReducerSampler<Reducer, T, Op, InvOp> sampler_type;
- class SeriesSampler : public detail::Sampler {
- public:
- SeriesSampler(Reducer* owner, const Op& op)
- : _owner(owner), _series(op) {}
- void take_sample() override { _series.append(_owner->get_value()); }
- void describe(std::ostream& os) { _series.describe(os, NULL); }
- private:
- Reducer* _owner;
- detail::Series<T, Op> _series;
- };
+ typedef detail::SeriesSamplerImpl<Reducer, T, Op> SeriesSampler;
-public:
// The `identify' must satisfy: identity Op a == a
explicit Reducer(typename butil::add_cr_non_integral<T>::type identity =
T(),
const Op& op = Op(), const InvOp& inv_op = InvOp())
@@ -205,34 +319,55 @@ inline Reducer<T, Op, InvOp>& Reducer<T, Op,
InvOp>::operator<<(
namespace detail {
template <typename Tp>
struct AddTo {
- void operator()(Tp & lhs,
+ void operator()(Tp & lhs,
typename butil::add_cr_non_integral<Tp>::type rhs) const
{ lhs += rhs; }
};
template <typename Tp>
struct MinusFrom {
- void operator()(Tp & lhs,
+ void operator()(Tp & lhs,
typename butil::add_cr_non_integral<Tp>::type rhs) const
{ lhs -= rhs; }
};
-}
-template <typename T>
+} // namespace detail
+
+template <typename T, typename = void>
class Adder : public Reducer<T, detail::AddTo<T>, detail::MinusFrom<T> > {
public:
typedef Reducer<T, detail::AddTo<T>, detail::MinusFrom<T> > Base;
typedef T value_type;
typedef typename Base::sampler_type sampler_type;
-public:
+
Adder() : Base() {}
- explicit Adder(const butil::StringPiece& name) : Base() {
+ Adder(const butil::StringPiece& name) : Base() {
this->expose(name);
}
Adder(const butil::StringPiece& prefix,
const butil::StringPiece& name) : Base() {
this->expose_as(prefix, name);
}
- ~Adder() { Variable::hide(); }
+ ~Adder() override { Variable::hide(); }
+};
+
+#if WITH_BABYLON_COUNTER
+// Numerical types supported by babylon counter.
+template <typename T>
+class Adder<T,
std::enable_if<std::is_constructible<babylon::GenericsConcurrentAdder<T>>::value>>
+ : public detail::BabylonVariable<T, babylon::GenericsConcurrentAdder<T>,
+ detail::AddTo<T>, detail::MinusFrom<T>> {
+public:
+ typedef T value_type;
+private:
+ typedef detail::BabylonVariable<T, babylon::GenericsConcurrentAdder<T>,
+ detail::AddTo<value_type>,
detail::MinusFrom<value_type>> Base;
+public:
+ typedef detail::AddTo<value_type> Op;
+ typedef detail::MinusFrom<value_type> InvOp;
+ typedef typename Base::sampler_type sampler_type;
+
+ COMMON_VARIABLE_CONSTRUCTOR(Adder);
};
+#endif // WITH_BABYLON_COUNTER
// bvar::Maxer<int> max_value;
// max_value << 1 << 2 << 3 << 4;
@@ -248,17 +383,19 @@ struct MaxTo {
}
}
};
+
class LatencyRecorderBase;
-}
-template <typename T>
+} // namespace detail
+
+template <typename T, typename = void>
class Maxer : public Reducer<T, detail::MaxTo<T> > {
public:
typedef Reducer<T, detail::MaxTo<T> > Base;
typedef T value_type;
typedef typename Base::sampler_type sampler_type;
-public:
+
Maxer() : Base(std::numeric_limits<T>::min()) {}
- explicit Maxer(const butil::StringPiece& name)
+ Maxer(const butil::StringPiece& name)
: Base(std::numeric_limits<T>::min()) {
this->expose(name);
}
@@ -266,7 +403,8 @@ public:
: Base(std::numeric_limits<T>::min()) {
this->expose_as(prefix, name);
}
- ~Maxer() { Variable::hide(); }
+ ~Maxer() override { Variable::hide(); }
+
private:
friend class detail::LatencyRecorderBase;
// The following private funcition a now used in LatencyRecorder,
@@ -283,32 +421,83 @@ private:
}
};
+#if WITH_BABYLON_COUNTER
+namespace detail {
+template <typename T>
+class ConcurrentMaxer : public babylon::GenericsConcurrentMaxer<T> {
+ typedef babylon::GenericsConcurrentMaxer<T> Base;
+public:
+ ConcurrentMaxer() = default;
+ ConcurrentMaxer(T default_value) : _default_value(default_value) {}
+
+ T value() const {
+ T result;
+ if (!Base::value(result)) {
+ return _default_value;
+ }
+ return std::max(result, _default_value);
+ }
+private:
+ T _default_value{0};
+};
+} // namespace detail
+
+// Numerical types supported by babylon counter.
+template <typename T>
+class Maxer<T,
std::enable_if<std::is_constructible<detail::ConcurrentMaxer<T>>::value>>
+ : public detail::BabylonVariable<T, detail::ConcurrentMaxer<T>,
+ detail::MaxTo<T>, detail::VoidOp> {
+public:
+ typedef T value_type;
+private:
+ typedef detail::BabylonVariable<T, detail::ConcurrentMaxer<T>,
+ detail::MaxTo<value_type>, detail::VoidOp>
Base;
+public:
+ typedef detail::MaxTo<value_type> Op;
+ typedef detail::VoidOp InvOp;
+ typedef typename Base::sampler_type sampler_type;
+
+ COMMON_VARIABLE_CONSTRUCTOR(Maxer);
+
+private:
+friend class detail::LatencyRecorderBase;
+
+ Maxer(T default_value) : Base(default_value) {}
+ Maxer(T default_value, const butil::StringPiece& prefix, const
butil::StringPiece& name)
+ : Base(default_value) {
+ Variable::expose_as(prefix, name);
+ }
+ Maxer(T default_value, const butil::StringPiece& name)
+ : Base(default_value) {
+ Variable::expose(name);
+ }
+};
+#endif // WITH_BABYLON_COUNTER
+
// bvar::Miner<int> min_value;
// min_value << 1 << 2 << 3 << 4;
// LOG(INFO) << min_value.get_value(); // 1
namespace detail {
-
-template <typename Tp>
+template <typename Tp>
struct MinTo {
- void operator()(Tp & lhs,
+ void operator()(Tp & lhs,
typename butil::add_cr_non_integral<Tp>::type rhs) const {
if (rhs < lhs) {
lhs = rhs;
}
}
};
-
} // namespace detail
-template <typename T>
+template <typename T, typename = void>
class Miner : public Reducer<T, detail::MinTo<T> > {
public:
typedef Reducer<T, detail::MinTo<T> > Base;
typedef T value_type;
typedef typename Base::sampler_type sampler_type;
-public:
+
Miner() : Base(std::numeric_limits<T>::max()) {}
- explicit Miner(const butil::StringPiece& name)
+ Miner(const butil::StringPiece& name)
: Base(std::numeric_limits<T>::max()) {
this->expose(name);
}
@@ -316,8 +505,28 @@ public:
: Base(std::numeric_limits<T>::max()) {
this->expose_as(prefix, name);
}
- ~Miner() { Variable::hide(); }
+ ~Miner() override { Variable::hide(); }
+};
+
+#if WITH_BABYLON_COUNTER
+// Numerical types supported by babylon counter.
+template <typename T>
+class Miner<T,
std::enable_if<std::is_constructible<babylon::GenericsConcurrentMiner<T>>::value>>
+ : public detail::BabylonVariable<T, babylon::GenericsConcurrentMiner<T>,
+ detail::MinTo<T>, detail::VoidOp> {
+public:
+ typedef T value_type;
+private:
+ typedef detail::BabylonVariable<value_type,
babylon::GenericsConcurrentMiner<T>,
+ detail::MinTo<value_type>, detail::VoidOp>
Base;
+public:
+ typedef detail::MinTo<value_type> Op;
+ typedef detail::VoidOp InvOp;
+ typedef typename Base::sampler_type sampler_type;
+
+ COMMON_VARIABLE_CONSTRUCTOR(Miner);
};
+#endif // WITH_BABYLON_COUNTER
} // namespace bvar
diff --git a/src/bvar/variable.h b/src/bvar/variable.h
index 86e9cd0c..f01626fd 100644
--- a/src/bvar/variable.h
+++ b/src/bvar/variable.h
@@ -39,6 +39,16 @@ namespace bvar {
DECLARE_bool(save_series);
+#define COMMON_VARIABLE_CONSTRUCTOR(TypeName)
\
+ TypeName() = default;
\
+ TypeName(const butil::StringPiece& name) {
\
+ this->expose(name);
\
+ }
\
+ TypeName(const butil::StringPiece& prefix, const butil::StringPiece& name)
{ \
+ this->expose_as(prefix, name);
\
+ }
\
+
+
// Bitwise masks of displayable targets
enum DisplayFilter {
DISPLAY_ON_HTML = 1,
diff --git a/test/bvar_lock_timer_unittest.cpp
b/test/bvar_lock_timer_unittest.cpp
index f69ba3a2..c4299b55 100644
--- a/test/bvar_lock_timer_unittest.cpp
+++ b/test/bvar_lock_timer_unittest.cpp
@@ -197,8 +197,12 @@ TEST_F(LockTimerTest, double_lock_time) {
ASSERT_EQ(OPS_PER_THREAD * ARRAY_SIZE(threads), (size_t)r1.count());
LOG(INFO) << r0;
LOG(INFO) << r1._latency;
+#if !WITH_BABYLON_COUNTER
+ // reset() of IntRecorder with babylon counter does not work,
+ // should never call reset().
r0.reset();
r1._latency.reset();
+#endif // !WITH_BABYLON_COUNTER
DoubleLockArg<M1, M0> arg1;
arg1.m0.set_recorder(r1);
arg1.m1.set_recorder(r0);
@@ -209,8 +213,13 @@ TEST_F(LockTimerTest, double_lock_time) {
for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
pthread_join(threads[i], NULL);
}
+#if !WITH_BABYLON_COUNTER
ASSERT_EQ(OPS_PER_THREAD * ARRAY_SIZE(threads),
(size_t)r0.get_value().num);
ASSERT_EQ(OPS_PER_THREAD * ARRAY_SIZE(threads), (size_t)r1.count());
+#else
+ ASSERT_EQ(OPS_PER_THREAD * ARRAY_SIZE(threads) * 2,
(size_t)r0.get_value().num);
+ ASSERT_EQ(OPS_PER_THREAD * ARRAY_SIZE(threads) * 2, (size_t)r1.count());
+#endif // !WITH_BABYLON_COUNTER
LOG(INFO) << r0;
LOG(INFO) << r1._latency;
}
diff --git a/test/bvar_percentile_unittest.cpp
b/test/bvar_percentile_unittest.cpp
index 696e2a5c..f647e272 100644
--- a/test/bvar_percentile_unittest.cpp
+++ b/test/bvar_percentile_unittest.cpp
@@ -137,6 +137,7 @@ TEST_F(PercentileTest, merge2) {
<< " belong_to_b2=" << belong_to_b2;
}
+#if !WITH_BABYLON_COUNTER
TEST_F(PercentileTest, combine_of) {
// Combine multiple percentle samplers into one
const int num_samplers = 10;
@@ -146,7 +147,7 @@ TEST_F(PercentileTest, combine_of) {
const int N = 1000;
size_t belongs[num_samplers] = {0};
size_t total = 0;
- for (int repeat = 0; repeat < 100; ++repeat) {
+ for (int repeat = 0; repeat < 1; ++repeat) {
bvar::detail::Percentile p[num_samplers];
for (int i = 0; i < num_samplers; ++i) {
for (int j = 0; j < N * (i + 1); ++j) {
@@ -186,3 +187,4 @@ TEST_F(PercentileTest, combine_of) {
}
}
+#endif // !WITH_BABYLON_COUNTER
diff --git a/test/bvar_recorder_unittest.cpp b/test/bvar_recorder_unittest.cpp
index 659db01b..c0d32062 100644
--- a/test/bvar_recorder_unittest.cpp
+++ b/test/bvar_recorder_unittest.cpp
@@ -29,6 +29,7 @@
#include <gtest/gtest.h>
namespace {
+#if !WITH_BABYLON_COUNTER
TEST(RecorderTest, test_complement) {
LOG(INFO) << "sizeof(LatencyRecorder)=" << sizeof(bvar::LatencyRecorder)
<< " " << sizeof(bvar::detail::Percentile)
@@ -61,6 +62,7 @@ TEST(RecorderTest, test_compress_negtive_number) {
ASSERT_EQ(a,
bvar::IntRecorder::_extend_sign_bit(bvar::IntRecorder::_get_sum(compressed)));
}
}
+#endif // !WITH_BABYLON_COUNTER
TEST(RecorderTest, sanity) {
{
diff --git a/test/bvar_reducer_unittest.cpp b/test/bvar_reducer_unittest.cpp
index 45e42cd3..48e13b3c 100644
--- a/test/bvar_reducer_unittest.cpp
+++ b/test/bvar_reducer_unittest.cpp
@@ -248,8 +248,8 @@ void ReducerTest_window() {
}
TEST_F(ReducerTest, window) {
-#if !BRPC_WITH_GLOG
ReducerTest_window();
+#if !BRPC_WITH_GLOG
logging::StringSink log_str;
logging::LogSink* old_sink = logging::SetLogSink(&log_str);
sleep(1);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]