This is an automated email from the ASF dual-hosted git repository.

guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d3bded9 Support higher performance bvar with babylon counter (#3116)
1d3bded9 is described below

commit 1d3bded92a8de6db7d76e53901afec205501952d
Author: Bright Chen <[email protected]>
AuthorDate: Sun Oct 26 15:22:43 2025 +0800

    Support higher performance bvar with babylon counter (#3116)
    
    * Support higher performance bvar with babylon counter
    
    * Update documents
---
 .bazelrc                          |   3 +-
 .github/pull_request_template.md  |   2 +-
 .github/workflows/ci-linux.yml    |   9 ++
 .github/workflows/ci-macos.yml    |   1 -
 BUILD.bazel                       |  10 +-
 MODULE.bazel                      |   1 +
 bazel/config/BUILD.bazel          |   6 +
 docs/cn/bvar_c++.md               |   6 +
 docs/en/bvar_c++.md               |   6 +
 src/butil/macros.h                |   4 +-
 src/butil/scoped_lock.h           |   2 +-
 src/bvar/detail/percentile.cpp    |  37 +++++-
 src/bvar/detail/percentile.h      | 135 ++++++++++++++++---
 src/bvar/latency_recorder.cpp     |   2 +-
 src/bvar/latency_recorder.h       |   4 +-
 src/bvar/recorder.h               | 124 +++++++++++++++--
 src/bvar/reducer.h                | 273 +++++++++++++++++++++++++++++++++-----
 src/bvar/variable.h               |  10 ++
 test/bvar_lock_timer_unittest.cpp |   9 ++
 test/bvar_percentile_unittest.cpp |   4 +-
 test/bvar_recorder_unittest.cpp   |   2 +
 test/bvar_reducer_unittest.cpp    |   2 +-
 22 files changed, 580 insertions(+), 72 deletions(-)

diff --git a/.bazelrc b/.bazelrc
index 3ffec2dc..2ee10dda 100644
--- a/.bazelrc
+++ b/.bazelrc
@@ -20,6 +20,7 @@ common --registry=https://bcr.bazel.build
 common --registry=https://baidu.github.io/babylon/registry
 
 build --cxxopt="-std=c++17"
+build --copt="-fno-omit-frame-pointer"
 # Use gnu17 for asm keyword.
 build --conlyopt="-std=gnu17"
 
@@ -33,8 +34,8 @@ build --features=per_object_debug_info
 build --define absl=1
 
 # For brpc.
-build --define=BRPC_WITH_GLOG=true
 test --define=BRPC_BUILD_FOR_UNITTEST=true
+test --test_output=streamed
 
 # Pass PATH, CC, CXX and LLVM_CONFIG variables from the environment.
 build --action_env=CC
diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md
index 6cd036b6..aa227cd6 100644
--- a/.github/pull_request_template.md
+++ b/.github/pull_request_template.md
@@ -1,6 +1,6 @@
 ### What problem does this PR solve?
 
-Issue Number:
+Issue Number: resolve 
 
 Problem Summary:
 
diff --git a/.github/workflows/ci-linux.yml b/.github/workflows/ci-linux.yml
index 5d749a32..f2d6d692 100644
--- a/.github/workflows/ci-linux.yml
+++ b/.github/workflows/ci-linux.yml
@@ -225,3 +225,12 @@ jobs:
       run: |
         cd test
         sh ./run_tests.sh
+
+  bazel-bvar-unittest:
+    runs-on: ubuntu-22.04
+    steps:
+    - uses: actions/checkout@v2
+    - run: bazel test --verbose_failures //test:bvar_test
+    - run: bazel test --verbose_failures --define with_babylon_counter=true 
//test:bvar_test
+    - run: bazel test --verbose_failures --action_env=CC=clang //test:bvar_test
+    - run: bazel test --verbose_failures --action_env=CC=clang --define 
with_babylon_counter=true //test:bvar_test
diff --git a/.github/workflows/ci-macos.yml b/.github/workflows/ci-macos.yml
index 92b70b5b..61d45ac8 100644
--- a/.github/workflows/ci-macos.yml
+++ b/.github/workflows/ci-macos.yml
@@ -61,7 +61,6 @@ jobs:
 
   compile-with-bazel:
     runs-on: macos-latest # https://github.com/actions/runner-images
-
     steps:
       - uses: actions/checkout@v2
       - run: bazel build --verbose_failures -- //:brpc -//example/...
diff --git a/BUILD.bazel b/BUILD.bazel
index a3e7b29f..138e416b 100644
--- a/BUILD.bazel
+++ b/BUILD.bazel
@@ -21,6 +21,7 @@ licenses(["notice"])  # Apache v2
 exports_files(["LICENSE"])
 
 COPTS = [
+    "-fno-omit-frame-pointer",
     "-DBTHREAD_USE_FAST_PTHREAD_MUTEX",
     "-D__const__=__unused__",
     "-D_GNU_SOURCE",
@@ -377,6 +378,10 @@ cc_library(
         "src/bvar/utils/*.h",
         "src/bvar/detail/*.h",
     ]),
+    defines = [] + select({
+        "//bazel/config:with_babylon_counter": ["WITH_BABYLON_COUNTER=1"],
+        "//conditions:default": [],
+    }),
     copts = COPTS + select({
         "//bazel/config:brpc_build_for_unittest": [
             "-DBVAR_NOT_LINK_DEFAULT_VARIABLES",
@@ -391,7 +396,10 @@ cc_library(
     visibility = ["//visibility:public"],
     deps = [
         ":butil",
-    ],
+    ]  + select({
+        "//bazel/config:with_babylon_counter": 
["@babylon//:concurrent_counter"],
+        "//conditions:default": [],
+    }),
 )
 
 cc_library(
diff --git a/MODULE.bazel b/MODULE.bazel
index fac96037..43223fb1 100644
--- a/MODULE.bazel
+++ b/MODULE.bazel
@@ -17,6 +17,7 @@ bazel_dep(name = 'rules_cc', version = '0.0.1')
 bazel_dep(name = 'rules_proto', version = '4.0.0')
 bazel_dep(name = 'zlib', version = '1.3.1.bcr.5', repo_name = 
'com_github_madler_zlib')
 bazel_dep(name = 'libunwind', version = '1.8.1', repo_name = 
'com_github_libunwind_libunwind')
+bazel_dep(name = 'babylon', version = '1.4.4')
 
 # --registry=https://baidu.github.io/babylon/registry
 bazel_dep(name = 'leveldb', version = '1.23', repo_name = 
'com_github_google_leveldb')
diff --git a/bazel/config/BUILD.bazel b/bazel/config/BUILD.bazel
index 594dc513..d08ea2ec 100644
--- a/bazel/config/BUILD.bazel
+++ b/bazel/config/BUILD.bazel
@@ -142,4 +142,10 @@ config_setting(
     name = "brpc_with_no_pthread_mutex_hook",
     define_values = {"BRPC_WITH_NO_PTHREAD_MUTEX_HOOK": "true"},
     visibility = ["//visibility:public"],
+)
+
+config_setting(
+    name = "with_babylon_counter",
+    define_values = {"with_babylon_counter": "true"},
+    visibility = ["//visibility:public"],
 )
\ No newline at end of file
diff --git a/docs/cn/bvar_c++.md b/docs/cn/bvar_c++.md
index 478335fe..ddc0de1b 100644
--- a/docs/cn/bvar_c++.md
+++ b/docs/cn/bvar_c++.md
@@ -650,3 +650,9 @@ static bvar::GFlag 
s_gflag_my_flag_that_matters("my_flag_that_matters");
 // Expose the gflag as a bvar named "foo_bar_my_flag_that_matters".
 static bvar::GFlag s_gflag_my_flag_that_matters_with_prefix("foo_bar", 
"my_flag_that_matters");
 ```
+# babylon counter bvar
+
+原理和性能见[babylon介绍](https://github.com/baidu/babylon/tree/main/example/use-counter-with-bvar)。
+
+目前只支持bazel编译方式:`--define with_babylon_counter=true`,babylon版本要求:>= 
1.4.4。打开开关后,即可使用基于babylon counter实现的更高性能的bvar,无需修改代码。
+
diff --git a/docs/en/bvar_c++.md b/docs/en/bvar_c++.md
index 73fac6c4..160cdfcc 100644
--- a/docs/en/bvar_c++.md
+++ b/docs/en/bvar_c++.md
@@ -490,3 +490,9 @@ static bvar::GFlag 
s_gflag_my_flag_that_matters("my_flag_that_matters");
 // Expose the gflag as a bvar named "foo_bar_my_flag_that_matters".
 static bvar::GFlag s_gflag_my_flag_that_matters_with_prefix("foo_bar", 
"my_flag_that_matters");
 ```
+
+# babylon counter bvar
+
+For details on the principles and performance, see [Use concurrent counter 
optimize 
bvar](https://github.com/baidu/babylon/tree/main/example/use-counter-with-bvar).
+
+Currently, this feature is only supported by the Bazel compilation method: 
`--define with_babylon_counter=true`. The required Babylon version is 1.4.4 or 
higher. Once enabled, you can use the higher-performance bvar implementation 
based on the babylon counter without modifying your code.
\ No newline at end of file
diff --git a/src/butil/macros.h b/src/butil/macros.h
index 63bb64de..d88a82d0 100644
--- a/src/butil/macros.h
+++ b/src/butil/macros.h
@@ -44,11 +44,11 @@
 
 // Declarations for a class to be unassignable.
 #define DISALLOW_ASSIGN(TypeName)                        \
-    BUTIL_DELETE_FUNCTION(void operator=(const TypeName&))
+    BUTIL_DELETE_FUNCTION(TypeName& operator=(const TypeName&))
 
 // Declarations for a class to be move-unassignable.
 #define DISALLOW_MOVE_ASSIGN(TypeName)                   \
-    BUTIL_DELETE_FUNCTION(void operator=(TypeName&&))
+    BUTIL_DELETE_FUNCTION(TypeName& operator=(TypeName&&))
 
 // A macro to disallow the copy constructor and operator= functions.
 #define DISALLOW_COPY_AND_ASSIGN(TypeName)               \
diff --git a/src/butil/scoped_lock.h b/src/butil/scoped_lock.h
index 45f52301..1111daab 100644
--- a/src/butil/scoped_lock.h
+++ b/src/butil/scoped_lock.h
@@ -378,7 +378,7 @@ private:
 
 namespace butil {
 
-// Lock both lck1 and lck2 without the dead lock issue
+// Lock both lck1 and lck2 without the deadlock issue.
 template <typename Mutex1, typename Mutex2>
 void double_lock(std::unique_lock<Mutex1> &lck1, std::unique_lock<Mutex2> 
&lck2) {
     DCHECK(!lck1.owns_lock());
diff --git a/src/bvar/detail/percentile.cpp b/src/bvar/detail/percentile.cpp
index 37181cc3..99de328c 100644
--- a/src/bvar/detail/percentile.cpp
+++ b/src/bvar/detail/percentile.cpp
@@ -22,7 +22,7 @@
 
 namespace bvar {
 namespace detail {
-
+#if !WITH_BABYLON_COUNTER
 inline uint32_t ones32(uint32_t x) {
     /* 32-bit recursive reduction using SWAR...
      * but first step is mapping 2-bit values
@@ -127,6 +127,41 @@ Percentile &Percentile::operator<<(int64_t latency) {
     agent->merge_global(AddLatency(latency), _combiner);
     return *this;
 }
+#else
+Percentile::value_type Percentile::reset() {
+    constexpr static size_t SAMPLE_SIZE = value_type::SAMPLE_SIZE;
+    value_type result;
+    _concurrent_sampler.for_each([&](
+        size_t index, const babylon::ConcurrentSampler::SampleBucket& bucket) {
+        result.merge(bucket, index);
+        auto capacity = _concurrent_sampler.bucket_capacity(index);
+        auto num_added = bucket.record_num.load(::std::memory_order_relaxed);
+        if (capacity < SAMPLE_SIZE && num_added > capacity) {
+            capacity = std::min<size_t>(SAMPLE_SIZE, num_added * 1.5);
+            _concurrent_sampler.set_bucket_capacity(index, capacity);
+        }
+    });
+    _concurrent_sampler.reset();
+    return result;
+}
 
+Percentile& Percentile::operator<<(int64_t value) {
+    if (BAIDU_UNLIKELY(value < 0)) {
+        // we don't check overflow(of uint32) in percentile because the
+        // overflowed value which is included in last range does not affect
+        // overall distribution of other values too much.
+        if (!_debug_name.empty()) {
+            LOG_EVERY_SECOND(WARNING) << "Input=" << value << " to `" << 
_debug_name
+                                      << "' is negative, drop";
+        } else {
+            LOG_EVERY_SECOND(WARNING) << "Input=" << value << " to Percentile("
+                                      << (void*)this << ") is negative, drop";
+        }
+    } else {
+        _concurrent_sampler << value;
+    }
+    return *this;
+}
+#endif // WITH_BABYLON_COUNTER
 }  // namespace detail
 }  // namespace bvar
diff --git a/src/bvar/detail/percentile.h b/src/bvar/detail/percentile.h
index 186103b4..bac729f1 100644
--- a/src/bvar/detail/percentile.h
+++ b/src/bvar/detail/percentile.h
@@ -32,6 +32,9 @@
 #include "bvar/detail/combiner.h"       // AgentCombiner
 #include "bvar/detail/sampler.h"        // ReducerSampler
 #include "butil/fast_rand.h"
+#if WITH_BABYLON_COUNTER
+#include "babylon/concurrent/counter.h"
+#endif // WITH_BABYLON_COUNTER
 
 namespace bvar {
 namespace detail {
@@ -139,6 +142,43 @@ public:
         _num_added += rhs._num_added;
     }
 
+#if WITH_BABYLON_COUNTER
+    size_t merge(const babylon::ConcurrentSampler::SampleBucket& bucket) {
+        auto num_added = bucket.record_num.load(std::memory_order_acquire);
+        if (num_added == 0) {
+            return 0;
+        }
+        auto num_samples = std::min(num_added, 
static_cast<uint32_t>(bucket.capacity));
+        // If there is space, deposit directly.
+        if (_num_samples + num_samples <= SAMPLE_SIZE) {
+            __builtin_memcpy(_samples + _num_samples, bucket.data,
+                             sizeof(uint32_t) * num_samples);
+            _num_samples += num_samples;
+        } else {
+            // Sample probability weighting.
+            float ratio = static_cast<float>(num_samples) / num_added;
+            // Try to deposit directly first.
+            if (_num_samples < SAMPLE_SIZE) {
+                auto copy_size = SAMPLE_SIZE - _num_samples;
+                num_samples -= copy_size;
+                __builtin_memcpy(_samples + _num_samples,
+                                 bucket.data + num_samples, sizeof(uint32_t) * 
copy_size);
+            }
+            // The remaining samples are stored according to probability.
+            for (size_t i = 0; i < num_samples; ++i) {
+                auto index = butil::fast_rand() %
+                    static_cast<uint64_t>((_num_added + i) * ratio + 1);
+                if (index < SAMPLE_SIZE) {
+                    _samples[index] = bucket.data[i];
+                }
+            }
+            _num_samples = SAMPLE_SIZE;
+        }
+        _num_added += num_added;
+        return num_added;
+    }
+#endif // WITH_BABYLON_COUNTER
+
     // Randomly pick n samples from mutable_rhs to |this|
     template <size_t size2>
     void merge_with_expectation(PercentileInterval<size2>& mutable_rhs, size_t 
n) {
@@ -239,7 +279,9 @@ class AddLatency;
 template <size_t SAMPLE_SIZE_IN>
 class PercentileSamples {
 public:
+#if !WITH_BABYLON_COUNTER
 friend class AddLatency;
+#endif // WITH_BABYLON_COUNTER
 
     static const size_t SAMPLE_SIZE = SAMPLE_SIZE_IN;
     
@@ -324,6 +366,12 @@ friend class AddLatency;
         }
     }
 
+#if WITH_BABYLON_COUNTER
+    void merge(const babylon::ConcurrentSampler::SampleBucket& bucket, size_t 
index) {
+        _num_added += get_interval_at(index).merge(bucket);
+    }
+#endif // WITH_BABYLON_COUNTER
+
     // Combine multiple into a single PercentileSamples
     template <typename Iterator>
     void combine_of(const Iterator& begin, const Iterator& end) {
@@ -443,31 +491,36 @@ std::ostream &operator<<(std::ostream &os, const 
PercentileSamples<size> &p) {
 typedef PercentileSamples<254> GlobalPercentileSamples;
 typedef PercentileSamples<30> ThreadLocalPercentileSamples;
 
+namespace detail {
+struct AddPercentileSamples {
+    template <size_t size1, size_t size2>
+    void operator()(PercentileSamples<size1> &b1,
+                    const PercentileSamples<size2> &b2) const {
+        b1.merge(b2);
+    }
+};
+} // namespace detail
+
 // A specialized reducer for finding the percentile of latencies.
 // NOTE: DON'T use it directly, use LatencyRecorder instead.
+#if !WITH_BABYLON_COUNTER
 class Percentile {
 public:
-    struct AddPercentileSamples {
-        template <size_t size1, size_t size2>
-        void operator()(PercentileSamples<size1> &b1, 
-                        const PercentileSamples<size2> &b2) const {
-            b1.merge(b2);
-        }
-    };
-
-    typedef GlobalPercentileSamples                         value_type;
-    typedef ReducerSampler<Percentile, 
-                           GlobalPercentileSamples,
-                           AddPercentileSamples, VoidOp>    sampler_type;
+    typedef GlobalPercentileSamples value_type;
+    typedef ReducerSampler<Percentile, GlobalPercentileSamples,
+                           detail::AddPercentileSamples, VoidOp>sampler_type;
     typedef AgentCombiner <GlobalPercentileSamples,
                            ThreadLocalPercentileSamples,
-                           AddPercentileSamples>            combiner_type;
-    typedef typename combiner_type::self_shared_type        
shared_combiner_type;
-    typedef combiner_type::Agent                            agent_type;
+                           detail::AddPercentileSamples> combiner_type;
+    typedef combiner_type::self_shared_type shared_combiner_type;
+    typedef combiner_type::Agent agent_type;
+
     Percentile();
     ~Percentile();
 
-    AddPercentileSamples op() const { return AddPercentileSamples(); }
+    detail::AddPercentileSamples op() const {
+        return detail::AddPercentileSamples();
+    }
     VoidOp inv_op() const { return VoidOp(); }
 
     // The sampler for windows over percentile.
@@ -499,6 +552,56 @@ private:
     sampler_type* _sampler;
     std::string _debug_name;
 };
+#else
+class Percentile {
+public:
+    typedef GlobalPercentileSamples value_type;
+    typedef detail::AddPercentileSamples AddPercentileSamples;
+    typedef AddPercentileSamples Op;
+    typedef VoidOp InvOp;
+    typedef ReducerSampler<Percentile, value_type, Op, InvOp> sampler_type;
+
+    Percentile() = default;
+    DISALLOW_COPY_AND_MOVE(Percentile);
+    ~Percentile() noexcept {
+        if (NULL != _sampler) {
+            _sampler->destroy();
+        }
+    }
+
+    Op op() const { return Op(); }
+    InvOp inv_op() const { return InvOp(); }
+
+    sampler_type* get_sampler() {
+        if (NULL == _sampler) {
+            _sampler = new sampler_type(this);
+            _sampler->schedule();
+        }
+        return _sampler;
+    }
+
+    value_type reset();
+
+    value_type get_value() const {
+        LOG_EVERY_SECOND(ERROR) << "Percentile should never call this 
get_value()";
+        return value_type();
+    }
+
+    Percentile& operator<<(int64_t value);
+
+    bool valid() const { return true; }
+
+    // This name is useful for warning negative latencies in operator<<
+    void set_debug_name(const butil::StringPiece& name) {
+        _debug_name.assign(name.data(), name.size());
+    }
+
+private:
+    babylon::ConcurrentSampler _concurrent_sampler;
+    sampler_type* _sampler{NULL};
+    std::string _debug_name;
+};
+#endif // WITH_BABYLON_COUNTER
 
 }  // namespace detail
 }  // namespace bvar
diff --git a/src/bvar/latency_recorder.cpp b/src/bvar/latency_recorder.cpp
index f70894ae..a951376d 100644
--- a/src/bvar/latency_recorder.cpp
+++ b/src/bvar/latency_recorder.cpp
@@ -99,7 +99,7 @@ static int64_t double_to_random_int(double dval) {
 }
 
 static int64_t get_window_recorder_qps(void* arg) {
-    detail::Sample<Stat> s;
+    Sample<Stat> s;
     static_cast<RecorderWindow*>(arg)->get_span(&s);
     // Use floating point to avoid overflow.
     if (s.time_us <= 0) {
diff --git a/src/bvar/latency_recorder.h b/src/bvar/latency_recorder.h
index 5e702992..874b1178 100644
--- a/src/bvar/latency_recorder.h
+++ b/src/bvar/latency_recorder.h
@@ -38,7 +38,7 @@ typedef Window<Percentile, SERIES_IN_SECOND> PercentileWindow;
 class CDF : public Variable {
 public:
     explicit CDF(PercentileWindow* w);
-    ~CDF();
+    ~CDF() override;
     void describe(std::ostream& os, bool quote_string) const override;
     int describe_series(std::ostream& os, const SeriesOptions& options) const 
override;
 private:
@@ -77,7 +77,7 @@ class LatencyRecorder : public detail::LatencyRecorderBase {
 public:
     LatencyRecorder() : Base(-1) {}
     explicit LatencyRecorder(time_t window_size) : Base(window_size) {}
-    explicit LatencyRecorder(const butil::StringPiece& prefix) : Base(-1) {
+    LatencyRecorder(const butil::StringPiece& prefix) : Base(-1) {
         expose(prefix);
     }
     LatencyRecorder(const butil::StringPiece& prefix,
diff --git a/src/bvar/recorder.h b/src/bvar/recorder.h
index 99bdfa75..b28b6372 100644
--- a/src/bvar/recorder.h
+++ b/src/bvar/recorder.h
@@ -27,6 +27,9 @@
 #include "bvar/variable.h"
 #include "bvar/window.h"
 #include "bvar/detail/sampler.h"
+#if WITH_BABYLON_COUNTER
+#include "babylon/concurrent/counter.h"
+#endif // WITH_BABYLON_COUNTER
 
 namespace bvar {
 
@@ -76,11 +79,22 @@ inline std::ostream& operator<<(std::ostream& os, const 
Stat& s) {
     }
 }
 
+namespace detail {
+struct AddStat {
+    void operator()(Stat& s1, const Stat& s2) const { s1 += s2; }
+};
+
+struct MinusStat {
+    void operator()(Stat& s1, const Stat& s2) const { s1 -= s2; }
+};
+} // namespace detail
+
 // For calculating average of numbers.
 // Example:
 //   IntRecorder latency;
 //   latency << 1 << 3 << 5;
 //   CHECK_EQ(3, latency.average());
+#if !WITH_BABYLON_COUNTER
 class IntRecorder : public Variable {
 public:
     // Compressing format:
@@ -92,16 +106,10 @@ public:
     BAIDU_CASSERT(SUM_BIT_WIDTH > 32 && SUM_BIT_WIDTH < 64, 
                   SUM_BIT_WIDTH_must_be_between_33_and_63);
 
-    struct AddStat {
-        void operator()(Stat& s1, const Stat& s2) const { s1 += s2; }
-    };
-    struct MinusStat {
-        void operator()(Stat& s1, const Stat& s2) const { s1 -= s2; }
-    };    
-
     typedef Stat value_type;
     typedef detail::ReducerSampler<IntRecorder, Stat,
-                                   AddStat, MinusStat> sampler_type;
+                                   detail::AddStat,
+                                   detail::MinusStat> sampler_type;
 
     typedef Stat SampleSet;
     
@@ -118,7 +126,7 @@ public:
 
     IntRecorder() : _combiner(std::make_shared<combiner_type>()), 
_sampler(NULL) {}
 
-    explicit IntRecorder(const butil::StringPiece& name) : IntRecorder() {
+    IntRecorder(const butil::StringPiece& name) : IntRecorder() {
         expose(name);
     }
 
@@ -154,8 +162,8 @@ public:
         return _combiner->reset_all_agents();
     }
 
-    AddStat op() const { return AddStat(); }
-    MinusStat inv_op() const { return MinusStat(); }
+    detail::AddStat op() const { return detail::AddStat(); }
+    detail::MinusStat inv_op() const { return detail::MinusStat(); }
     
     void describe(std::ostream& os, bool /*quote_string*/) const override {
         os << get_value();
@@ -286,6 +294,100 @@ inline IntRecorder& IntRecorder::operator<<(int64_t 
sample) {
                  n, _compress(num + 1, sum + complement)));
     return *this;
 }
+#else // WITH_BABYLON_COUNTER
+class IntRecorder : public Variable {
+public:
+    typedef Stat value_type;
+    typedef detail::AddStat Op;
+    typedef detail::MinusStat InvOp;
+    typedef detail::ReducerSampler<IntRecorder, value_type, Op, InvOp> 
sampler_type;
+
+    COMMON_VARIABLE_CONSTRUCTOR(IntRecorder);
+
+    DISALLOW_COPY_AND_MOVE(IntRecorder);
+
+    ~IntRecorder() override {
+        hide();
+        if (NULL != _sampler) {
+            _sampler->destroy();
+        }
+    }
+
+    // Note: The input type is acutally int. Use int64_t to check overflow.
+    IntRecorder& operator<<(int64_t value) {
+        if (BAIDU_UNLIKELY((int64_t)(int)value != value)) {
+            const char* reason = NULL;
+            if (value > std::numeric_limits<int>::max()) {
+                reason = "overflows";
+                value = std::numeric_limits<int>::max();
+            } else {
+                reason = "underflows";
+                value = std::numeric_limits<int>::min();
+            }
+            // Truncate to be max or min of int. We're using 44 bits to store 
the
+            // sum thus following aggregations are not likely to be 
over/underflow.
+            if (!name().empty()) {
+                LOG(WARNING) << "Input=" << value << " to `" << name()
+                           << "\' " << reason;
+            } else if (!_debug_name.empty()) {
+                LOG(WARNING) << "Input=" << value << " to `" << _debug_name
+                           << "\' " << reason;
+            } else {
+                LOG(WARNING) << "Input=" << value << " to IntRecorder("
+                           << (void*)this << ") " << reason;
+            }
+        }
+
+        _summer << value;
+        return *this;
+    }
+
+    int64_t average() const {
+        return get_value().get_average_int();
+    }
+
+    double average(double) const {
+        return get_value().get_average_double();
+    }
+
+    value_type get_value() const {
+        auto summary = _summer.value();
+        return value_type{summary.sum, static_cast<ssize_t>(summary.num)};
+    }
+
+    value_type reset() {
+        LOG_EVERY_SECOND(ERROR) << "IntRecorder with babylon counter should 
never call reset()";
+        return get_value();
+    }
+
+    Op op() const { return Op(); }
+    InvOp inv_op() const { return InvOp(); }
+
+    void describe(::std::ostream& os, bool) const override {
+        os << get_value();
+    }
+
+    bool valid() const { return true; }
+
+    sampler_type* get_sampler() {
+        if (NULL == _sampler) {
+            _sampler = new sampler_type(this);
+            _sampler->schedule();
+        }
+        return _sampler;
+    }
+
+    // This name is useful for printing overflow log in operator<< since
+    // IntRecorder is often used as the source of data and not exposed.
+    void set_debug_name(const butil::StringPiece& name) {
+        _debug_name.assign(name.data(), name.size());
+    }
+private:
+    babylon::ConcurrentSummer _summer;
+    sampler_type* _sampler{NULL};
+    std::string _debug_name;
+};
+#endif // WITH_BABYLON_COUNTER
 
 }  // namespace bvar
 
diff --git a/src/bvar/reducer.h b/src/bvar/reducer.h
index ccf78054..543e77c8 100644
--- a/src/bvar/reducer.h
+++ b/src/bvar/reducer.h
@@ -29,9 +29,133 @@
 #include "bvar/detail/sampler.h"                  // ReducerSampler
 #include "bvar/detail/series.h"
 #include "bvar/window.h"
+#if WITH_BABYLON_COUNTER
+#include "babylon/concurrent/counter.h"
+#endif // WITH_BABYLON_COUNTER
 
 namespace bvar {
 
+namespace detail {
+template<typename O, typename T, typename Op>
+class SeriesSamplerImpl : public Sampler {
+public:
+    SeriesSamplerImpl(O* owner, const Op& op)
+        : _owner(owner), _series(op) {}
+    void take_sample() override { _series.append(_owner->get_value()); }
+    void describe(std::ostream& os) { _series.describe(os, NULL); }
+
+private:
+    O* _owner;
+    Series<T, Op> _series;
+};
+
+#if WITH_BABYLON_COUNTER
+template<typename T, typename Counter, typename Op, typename InvOp>
+class BabylonVariable: public Variable {
+public:
+    typedef ReducerSampler<BabylonVariable, T, Op, InvOp> sampler_type;
+    typedef SeriesSamplerImpl<BabylonVariable, T, Op> series_sampler_type;
+
+    BabylonVariable() = default;
+
+    template<typename U = T, typename std::enable_if<
+        !std::is_constructible<Counter, U>::value, bool>::type = false>
+    BabylonVariable(U) {}
+    // For Maxer.
+    template<typename U = T, typename std::enable_if<
+        std::is_constructible<Counter, U>::value, bool>::type = false>
+    BabylonVariable(U default_value) : _counter(default_value) {}
+
+    DISALLOW_COPY_AND_MOVE(BabylonVariable);
+
+    ~BabylonVariable() override {
+        hide();
+        if (NULL != _sampler) {
+            _sampler->destroy();
+        }
+        if (NULL != _series_sampler) {
+            _series_sampler->destroy();
+        }
+    }
+
+    BabylonVariable& operator<<(T value) {
+        _counter << value;
+        return *this;
+    }
+
+    sampler_type* get_sampler() {
+        if (NULL == _sampler) {
+            _sampler = new sampler_type(this);
+            _sampler->schedule();
+        }
+        return _sampler;
+    }
+
+    T get_value() const {
+        return _counter.value();
+    }
+
+    T reset() {
+        if (BAIDU_UNLIKELY((!butil::is_same<VoidOp, InvOp>::value))) {
+            CHECK(false) << "You should not call Reducer<" << 
butil::class_name_str<T>()
+                         << ", " << butil::class_name_str<Op>() << 
">::get_value() when a"
+                         << " Window<> is used because the operator does not 
have inverse.";
+            return get_value();
+        }
+
+        T result = _counter.value();
+        _counter.reset();
+        return result;
+    }
+
+    bool valid() const { return true; }
+
+    const Op& op() const { return _op; }
+    const InvOp& inv_op() const { return _inv_op;}
+
+    void describe(std::ostream& os, bool quote_string) const override {
+        if (butil::is_same<T, std::string>::value && quote_string) {
+            os << '"' << get_value() << '"';
+        } else {
+            os << get_value();
+        }
+    }
+
+    int describe_series(std::ostream& os, const SeriesOptions& options) const 
override {
+        if (NULL == _series_sampler) {
+            return 1;
+        }
+        if (!options.test_only) {
+            _series_sampler->describe(os);
+        }
+        return 0;
+    }
+
+protected:
+    int expose_impl(const butil::StringPiece& prefix,
+                    const butil::StringPiece& name,
+                    DisplayFilter display_filter) override {
+        const int rc = Variable::expose_impl(prefix, name, display_filter);
+        if (rc == 0 && NULL == _series_sampler &&
+            !butil::is_same<InvOp, VoidOp>::value &&
+            !butil::is_same<T, std::string>::value &&
+            FLAGS_save_series) {
+            _series_sampler = new series_sampler_type(this, _op);
+            _series_sampler->schedule();
+        }
+        return rc;
+    }
+
+private:
+    Counter _counter;
+    sampler_type* _sampler{NULL};
+    series_sampler_type* _series_sampler{NULL};
+    Op _op;
+    InvOp _inv_op;
+};
+#endif // WITH_BABYLON_COUNTER
+} // namespace detail
+
 // Reduce multiple values into one with `Op': e1 Op e2 Op e3 ...
 // `Op' shall satisfy:
 //   - associative:     a Op (b Op c) == (a Op b) Op c
@@ -68,22 +192,12 @@ namespace bvar {
 template <typename T, typename Op, typename InvOp = detail::VoidOp>
 class Reducer : public Variable {
 public:
-    typedef typename detail::AgentCombiner<T, T, Op> combiner_type;
+    typedef detail::AgentCombiner<T, T, Op> combiner_type;
     typedef typename combiner_type::self_shared_type shared_combiner_type;
     typedef typename combiner_type::Agent agent_type;
     typedef detail::ReducerSampler<Reducer, T, Op, InvOp> sampler_type;
-    class SeriesSampler : public detail::Sampler {
-    public:
-        SeriesSampler(Reducer* owner, const Op& op)
-            : _owner(owner), _series(op) {}
-        void take_sample() override { _series.append(_owner->get_value()); }
-        void describe(std::ostream& os) { _series.describe(os, NULL); }
-    private:
-        Reducer* _owner;
-        detail::Series<T, Op> _series;
-    };
+    typedef detail::SeriesSamplerImpl<Reducer, T, Op> SeriesSampler;
 
-public:
     // The `identify' must satisfy: identity Op a == a
     explicit Reducer(typename butil::add_cr_non_integral<T>::type identity = 
T(),
                      const Op& op = Op(), const InvOp& inv_op = InvOp())
@@ -205,34 +319,55 @@ inline Reducer<T, Op, InvOp>& Reducer<T, Op, 
InvOp>::operator<<(
 namespace detail {
 template <typename Tp>
 struct AddTo {
-    void operator()(Tp & lhs, 
+    void operator()(Tp & lhs,
                     typename butil::add_cr_non_integral<Tp>::type rhs) const
     { lhs += rhs; }
 };
 template <typename Tp>
 struct MinusFrom {
-    void operator()(Tp & lhs, 
+    void operator()(Tp & lhs,
                     typename butil::add_cr_non_integral<Tp>::type rhs) const
     { lhs -= rhs; }
 };
-}
-template <typename T>
+} // namespace detail
+
+template <typename T, typename = void>
 class Adder : public Reducer<T, detail::AddTo<T>, detail::MinusFrom<T> > {
 public:
     typedef Reducer<T, detail::AddTo<T>, detail::MinusFrom<T> > Base;
     typedef T value_type;
     typedef typename Base::sampler_type sampler_type;
-public:
+
     Adder() : Base() {}
-    explicit Adder(const butil::StringPiece& name) : Base() {
+    Adder(const butil::StringPiece& name) : Base() {
         this->expose(name);
     }
     Adder(const butil::StringPiece& prefix,
           const butil::StringPiece& name) : Base() {
         this->expose_as(prefix, name);
     }
-    ~Adder() { Variable::hide(); }
+    ~Adder() override { Variable::hide(); }
+};
+
+#if WITH_BABYLON_COUNTER
+// Numerical types supported by babylon counter.
+template <typename T>
+class Adder<T, 
std::enable_if<std::is_constructible<babylon::GenericsConcurrentAdder<T>>::value>>
+    : public detail::BabylonVariable<T, babylon::GenericsConcurrentAdder<T>,
+                                     detail::AddTo<T>, detail::MinusFrom<T>> {
+public:
+    typedef  T value_type;
+private:
+    typedef detail::BabylonVariable<T, babylon::GenericsConcurrentAdder<T>,
+                                    detail::AddTo<value_type>, 
detail::MinusFrom<value_type>> Base;
+public:
+    typedef detail::AddTo<value_type> Op;
+    typedef detail::MinusFrom<value_type> InvOp;
+    typedef typename Base::sampler_type sampler_type;
+
+    COMMON_VARIABLE_CONSTRUCTOR(Adder);
 };
+#endif // WITH_BABYLON_COUNTER
 
 // bvar::Maxer<int> max_value;
 // max_value << 1 << 2 << 3 << 4;
@@ -248,17 +383,19 @@ struct MaxTo {
         }
     }
 };
+
 class LatencyRecorderBase;
-}
-template <typename T>
+} // namespace detail
+
+template <typename T, typename = void>
 class Maxer : public Reducer<T, detail::MaxTo<T> > {
 public:
     typedef Reducer<T, detail::MaxTo<T> > Base;
     typedef T value_type;
     typedef typename Base::sampler_type sampler_type;
-public:
+
     Maxer() : Base(std::numeric_limits<T>::min()) {}
-    explicit Maxer(const butil::StringPiece& name)
+    Maxer(const butil::StringPiece& name)
         : Base(std::numeric_limits<T>::min()) {
         this->expose(name);
     }
@@ -266,7 +403,8 @@ public:
         : Base(std::numeric_limits<T>::min()) {
         this->expose_as(prefix, name);
     }
-    ~Maxer() { Variable::hide(); }
+    ~Maxer() override { Variable::hide(); }
+
 private:
     friend class detail::LatencyRecorderBase;
     // The following private funcition a now used in LatencyRecorder,
@@ -283,32 +421,83 @@ private:
     }
 };
 
+#if WITH_BABYLON_COUNTER
+namespace detail {
+template <typename T>
+class ConcurrentMaxer : public babylon::GenericsConcurrentMaxer<T> {
+    typedef babylon::GenericsConcurrentMaxer<T> Base;
+public:
+    ConcurrentMaxer() = default;
+    ConcurrentMaxer(T default_value) : _default_value(default_value) {}
+
+    T value() const {
+        T result;
+        if (!Base::value(result)) {
+            return _default_value;
+        }
+        return std::max(result, _default_value);
+    }
+private:
+    T _default_value{0};
+};
+} // namespace detail
+
+// Numerical types supported by babylon counter.
+template <typename T>
+class Maxer<T, 
std::enable_if<std::is_constructible<detail::ConcurrentMaxer<T>>::value>>
+    : public detail::BabylonVariable<T, detail::ConcurrentMaxer<T>,
+                                     detail::MaxTo<T>, detail::VoidOp> {
+public:
+    typedef T value_type;
+private:
+    typedef detail::BabylonVariable<T, detail::ConcurrentMaxer<T>,
+                                    detail::MaxTo<value_type>, detail::VoidOp> 
Base;
+public:
+    typedef detail::MaxTo<value_type> Op;
+    typedef detail::VoidOp InvOp;
+    typedef typename Base::sampler_type sampler_type;
+
+    COMMON_VARIABLE_CONSTRUCTOR(Maxer);
+
+private:
+friend class detail::LatencyRecorderBase;
+
+    Maxer(T default_value) : Base(default_value) {}
+    Maxer(T default_value, const butil::StringPiece& prefix, const 
butil::StringPiece& name)
+        : Base(default_value) {
+        Variable::expose_as(prefix, name);
+    }
+    Maxer(T default_value, const butil::StringPiece& name)
+        : Base(default_value) {
+        Variable::expose(name);
+    }
+};
+#endif // WITH_BABYLON_COUNTER
+
 // bvar::Miner<int> min_value;
 // min_value << 1 << 2 << 3 << 4;
 // LOG(INFO) << min_value.get_value(); // 1
 namespace detail {
-
-template <typename Tp> 
+template <typename Tp>
 struct MinTo {
-    void operator()(Tp & lhs, 
+    void operator()(Tp & lhs,
                     typename butil::add_cr_non_integral<Tp>::type rhs) const {
         if (rhs < lhs) {
             lhs = rhs;
         }
     }
 };
-
 }  // namespace detail
 
-template <typename T>
+template <typename T, typename = void>
 class Miner : public Reducer<T, detail::MinTo<T> > {
 public:
     typedef Reducer<T, detail::MinTo<T> > Base;
     typedef T value_type;
     typedef typename Base::sampler_type sampler_type;
-public:
+
     Miner() : Base(std::numeric_limits<T>::max()) {}
-    explicit Miner(const butil::StringPiece& name)
+    Miner(const butil::StringPiece& name)
         : Base(std::numeric_limits<T>::max()) {
         this->expose(name);
     }
@@ -316,8 +505,28 @@ public:
         : Base(std::numeric_limits<T>::max()) {
         this->expose_as(prefix, name);
     }
-    ~Miner() { Variable::hide(); }
+    ~Miner() override { Variable::hide(); }
+};
+
+#if WITH_BABYLON_COUNTER
+// Numerical types supported by babylon counter.
+template <typename T>
+class Miner<T, 
std::enable_if<std::is_constructible<babylon::GenericsConcurrentMiner<T>>::value>>
+    : public detail::BabylonVariable<T, babylon::GenericsConcurrentMiner<T>,
+                                     detail::MinTo<T>, detail::VoidOp> {
+public:
+    typedef T value_type;
+private:
+    typedef detail::BabylonVariable<value_type, 
babylon::GenericsConcurrentMiner<T>,
+                                    detail::MinTo<value_type>, detail::VoidOp> 
Base;
+public:
+    typedef detail::MinTo<value_type> Op;
+    typedef detail::VoidOp InvOp;
+    typedef typename Base::sampler_type sampler_type;
+
+    COMMON_VARIABLE_CONSTRUCTOR(Miner);
 };
+#endif // WITH_BABYLON_COUNTER
 
 }  // namespace bvar
 
diff --git a/src/bvar/variable.h b/src/bvar/variable.h
index 86e9cd0c..f01626fd 100644
--- a/src/bvar/variable.h
+++ b/src/bvar/variable.h
@@ -39,6 +39,16 @@ namespace bvar {
 
 DECLARE_bool(save_series);
 
+#define COMMON_VARIABLE_CONSTRUCTOR(TypeName)                                  
  \
+    TypeName() = default;                                                      
  \
+    TypeName(const butil::StringPiece& name) {                                 
  \
+        this->expose(name);                                                    
  \
+    }                                                                          
  \
+    TypeName(const butil::StringPiece& prefix, const butil::StringPiece& name) 
{ \
+        this->expose_as(prefix, name);                                         
  \
+    }                                                                          
  \
+
+
 // Bitwise masks of displayable targets 
 enum DisplayFilter {
     DISPLAY_ON_HTML = 1,
diff --git a/test/bvar_lock_timer_unittest.cpp 
b/test/bvar_lock_timer_unittest.cpp
index f69ba3a2..c4299b55 100644
--- a/test/bvar_lock_timer_unittest.cpp
+++ b/test/bvar_lock_timer_unittest.cpp
@@ -197,8 +197,12 @@ TEST_F(LockTimerTest, double_lock_time) {
     ASSERT_EQ(OPS_PER_THREAD * ARRAY_SIZE(threads), (size_t)r1.count());
     LOG(INFO) << r0;
     LOG(INFO) << r1._latency;
+#if !WITH_BABYLON_COUNTER
+    // reset() of IntRecorder with babylon counter does not work,
+    // should never call reset().
     r0.reset();
     r1._latency.reset();
+#endif // !WITH_BABYLON_COUNTER
     DoubleLockArg<M1, M0> arg1;
     arg1.m0.set_recorder(r1);
     arg1.m1.set_recorder(r0);
@@ -209,8 +213,13 @@ TEST_F(LockTimerTest, double_lock_time) {
     for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
         pthread_join(threads[i], NULL);
     }
+#if !WITH_BABYLON_COUNTER
     ASSERT_EQ(OPS_PER_THREAD * ARRAY_SIZE(threads), 
(size_t)r0.get_value().num);
     ASSERT_EQ(OPS_PER_THREAD * ARRAY_SIZE(threads), (size_t)r1.count());
+#else
+    ASSERT_EQ(OPS_PER_THREAD * ARRAY_SIZE(threads) * 2, 
(size_t)r0.get_value().num);
+    ASSERT_EQ(OPS_PER_THREAD * ARRAY_SIZE(threads) * 2, (size_t)r1.count());
+#endif // !WITH_BABYLON_COUNTER
     LOG(INFO) << r0;
     LOG(INFO) << r1._latency;
 }
diff --git a/test/bvar_percentile_unittest.cpp 
b/test/bvar_percentile_unittest.cpp
index 696e2a5c..f647e272 100644
--- a/test/bvar_percentile_unittest.cpp
+++ b/test/bvar_percentile_unittest.cpp
@@ -137,6 +137,7 @@ TEST_F(PercentileTest, merge2) {
                    << " belong_to_b2=" << belong_to_b2;
 }
 
+#if !WITH_BABYLON_COUNTER
 TEST_F(PercentileTest, combine_of) {
     // Combine multiple percentle samplers into one
     const int num_samplers = 10;
@@ -146,7 +147,7 @@ TEST_F(PercentileTest, combine_of) {
     const int  N = 1000;
     size_t belongs[num_samplers] = {0};
     size_t total = 0;
-    for (int repeat = 0; repeat < 100; ++repeat) {
+    for (int repeat = 0; repeat < 1; ++repeat) {
         bvar::detail::Percentile p[num_samplers];
         for (int i = 0; i < num_samplers; ++i) {
             for (int j = 0; j < N * (i + 1); ++j) {
@@ -186,3 +187,4 @@ TEST_F(PercentileTest, combine_of) {
                   
     }
 }
+#endif // !WITH_BABYLON_COUNTER
diff --git a/test/bvar_recorder_unittest.cpp b/test/bvar_recorder_unittest.cpp
index 659db01b..c0d32062 100644
--- a/test/bvar_recorder_unittest.cpp
+++ b/test/bvar_recorder_unittest.cpp
@@ -29,6 +29,7 @@
 #include <gtest/gtest.h>
 
 namespace {
+#if !WITH_BABYLON_COUNTER
 TEST(RecorderTest, test_complement) {
     LOG(INFO) << "sizeof(LatencyRecorder)=" << sizeof(bvar::LatencyRecorder)
               << " " << sizeof(bvar::detail::Percentile)
@@ -61,6 +62,7 @@ TEST(RecorderTest, test_compress_negtive_number) {
         ASSERT_EQ(a, 
bvar::IntRecorder::_extend_sign_bit(bvar::IntRecorder::_get_sum(compressed)));
     }
 }
+#endif // !WITH_BABYLON_COUNTER
 
 TEST(RecorderTest, sanity) {
     {
diff --git a/test/bvar_reducer_unittest.cpp b/test/bvar_reducer_unittest.cpp
index 45e42cd3..48e13b3c 100644
--- a/test/bvar_reducer_unittest.cpp
+++ b/test/bvar_reducer_unittest.cpp
@@ -248,8 +248,8 @@ void ReducerTest_window() {
 }
 
 TEST_F(ReducerTest, window) {
-#if !BRPC_WITH_GLOG
     ReducerTest_window();
+#if !BRPC_WITH_GLOG
     logging::StringSink log_str;
     logging::LogSink* old_sink = logging::SetLogSink(&log_str);
     sleep(1);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to