Copilot commented on code in PR #47:
URL: https://github.com/apache/paimon-cpp/pull/47#discussion_r3353909353


##########
src/paimon/common/metrics/histogram.cpp:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/metrics/histogram.h"
+
+#include <algorithm>
+#include <cmath>
+
+namespace paimon {
+
+HistogramImpl::HistogramImpl() : bucket_counts_(BucketLimits().size(), 0) {}
+
+void HistogramImpl::Add(double value) {
+    if (std::isnan(value)) {
+        return;
+    }
+    std::lock_guard<std::mutex> guard(mu_);
+    const auto& limits = BucketLimits();
+    size_t idx = BucketIndex(value, limits);
+    bucket_counts_[idx] += 1;
+    num_ += 1;
+    sum_ += value;
+    sum_squares_ += value * value;
+    if (value < min_) {
+        min_ = value;
+    }
+    if (value > max_) {
+        max_ = value;
+    }
+}
+
+HistogramStats HistogramImpl::GetStats() const {
+    return ToStats(GetSnapshot());
+}
+
+HistogramImpl::Snapshot HistogramImpl::GetSnapshot() const {
+    std::lock_guard<std::mutex> guard(mu_);
+    Snapshot s;
+    s.count = num_;
+    s.sum = sum_;
+    s.sum_squares = sum_squares_;
+    s.min = (num_ == 0) ? 0 : min_;
+    s.max = (num_ == 0) ? 0 : max_;
+    s.bucket_counts = bucket_counts_;
+    return s;
+}
+
+void HistogramImpl::Merge(const Histogram& other) {
+    const auto* other_impl = dynamic_cast<const HistogramImpl*>(&other);
+    if (!other_impl) {
+        return;
+    }
+    MergeFrom(*other_impl);
+}
+
+void HistogramImpl::MergeFrom(const HistogramImpl& other) {
+    Snapshot other_snapshot = other.GetSnapshot();
+    std::lock_guard<std::mutex> guard(mu_);
+    if (bucket_counts_.size() != other_snapshot.bucket_counts.size()) {
+        // Should never happen unless bucket config changes.
+        return;
+    }
+    if (other_snapshot.count == 0) {
+        return;
+    }
+    for (size_t i = 0; i < bucket_counts_.size(); ++i) {
+        bucket_counts_[i] += other_snapshot.bucket_counts[i];
+    }
+    if (num_ == 0) {
+        min_ = other_snapshot.min;
+        max_ = other_snapshot.max;
+    } else {
+        min_ = std::min(min_, other_snapshot.min);
+        max_ = std::max(max_, other_snapshot.max);
+    }
+    num_ += other_snapshot.count;
+    sum_ += other_snapshot.sum;
+    sum_squares_ += other_snapshot.sum_squares;
+}
+
+std::shared_ptr<Histogram> HistogramImpl::Clone() const {
+    Snapshot s = GetSnapshot();
+    auto cloned = std::make_shared<HistogramImpl>();
+    std::lock_guard<std::mutex> guard(cloned->mu_);
+    cloned->num_ = s.count;
+    cloned->sum_ = s.sum;
+    cloned->sum_squares_ = s.sum_squares;
+    cloned->min_ = (s.count == 0) ? std::numeric_limits<double>::infinity() : 
s.min;
+    cloned->max_ = s.max;
+    cloned->bucket_counts_ = std::move(s.bucket_counts);
+    return cloned;
+}
+
+HistogramStats HistogramImpl::ToStats(const Snapshot& s) {
+    HistogramStats stats;
+    stats.count = s.count;
+    stats.sum = s.sum;
+    stats.min = s.min;
+    stats.max = s.max;
+    stats.average = (s.count == 0) ? 0 : (s.sum / 
static_cast<double>(s.count));
+    stats.p50 = EstimatePercentile(s, 0.50);
+    stats.p90 = EstimatePercentile(s, 0.90);
+    stats.p95 = EstimatePercentile(s, 0.95);
+    stats.p99 = EstimatePercentile(s, 0.99);
+    stats.p999 = EstimatePercentile(s, 0.999);
+    stats.stddev = EstimateStddev(s);
+    return stats;
+}
+
+const std::vector<double>& HistogramImpl::BucketLimits() {
+    static const std::vector<double> limits = []() {
+        // Keep bucket generation aligned with RocksDB's HistogramBucketMapper
+        // (see rocksdb/monitoring/histogram.cc).
+        //
+        // The mapper starts from {1,2}, then grows by *1.5 and rounds each 
bucket boundary to
+        // be human-readable by keeping the two most significant digits.
+        std::vector<double> v;
+        v.reserve(256);
+        v.push_back(1);
+        v.push_back(2);
+
+        double bucket_val = v.back();
+        const auto max_u64 = 
static_cast<double>(std::numeric_limits<uint64_t>::max());
+        while ((bucket_val = 1.5 * bucket_val) <= max_u64) {
+            auto rounded = static_cast<uint64_t>(bucket_val);
+            // Keep two most significant digits (e.g., 172 -> 170).
+            uint64_t pow_of_ten = 1;
+            while (rounded / 10 > 10) {
+                rounded /= 10;
+                pow_of_ten *= 10;
+            }
+            rounded *= pow_of_ten;
+            v.push_back(static_cast<double>(rounded));
+        }
+        return v;
+    }();
+    return limits;
+}
+
+size_t HistogramImpl::BucketIndex(double value, const std::vector<double>& 
limits) {
+    if (value <= limits.front()) {
+        return 0;
+    }
+    auto it = std::lower_bound(limits.begin(), limits.end(), value);
+    if (it == limits.end()) {
+        return limits.size() - 1;
+    }
+    return static_cast<size_t>(std::distance(limits.begin(), it));
+}
+
+double HistogramImpl::EstimatePercentile(const Snapshot& s, double p) {
+    if (s.count == 0) {
+        return 0;
+    }
+    if (p <= 0) {
+        return s.min;
+    }
+    if (p >= 1) {
+        return s.max;
+    }
+    const auto& limits = BucketLimits();
+    const auto rank = static_cast<uint64_t>(std::ceil(p * 
static_cast<double>(s.count)));
+    uint64_t cum = 0;
+    for (size_t i = 0; i < s.bucket_counts.size(); ++i) {
+        uint64_t bcnt = s.bucket_counts[i];
+        if (bcnt == 0) {
+            continue;
+        }
+        uint64_t prev = cum;
+        cum += bcnt;
+        if (cum < rank) {
+            continue;
+        }
+
+        // Interpolate inside bucket: estimate that samples are uniformly 
distributed
+        // between bucket lower and upper bounds.
+        double upper = (i < limits.size()) ? limits[i] : s.max;
+        double lower = (i == 0) ? std::min(s.min, 0.0) : limits[i - 1];

Review Comment:
   Percentile estimation uses `upper = limits[i]` for every bucket index, 
including the last bucket. Since `BucketIndex()` maps values greater than the 
largest limit into the last bucket, the last bucket’s upper bound should be 
`s.max` (otherwise percentiles can be severely under-estimated when values 
exceed the last configured limit). Adjust the logic so the last bucket uses 
`s.max` as its upper bound.



##########
src/paimon/common/metrics/histogram.h:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <limits>
+#include <memory>
+#include <mutex>
+#include <vector>
+
+#include "paimon/metrics.h"
+
+namespace paimon {
+
+// Internal histogram interface used by MetricsImpl.
+//
+// Note: this interface is intentionally NOT exported.
+class Histogram {
+ public:
+    virtual ~Histogram() = default;
+
+    virtual void Add(double value) = 0;
+    virtual HistogramStats GetStats() const = 0;
+    virtual void Merge(const Histogram& other) = 0;
+    virtual std::shared_ptr<Histogram> Clone() const = 0;
+};
+
+// Internal histogram implementation.
+//
+// The bucket boundaries are aligned with RocksDB's HistogramBucketMapper
+// (rocksdb/monitoring/histogram.cc).
+class HistogramImpl : public Histogram {
+ public:
+    HistogramImpl();
+
+    void Add(double value) override;
+
+    HistogramStats GetStats() const override;
+
+    void Merge(const Histogram& other) override;
+
+    std::shared_ptr<Histogram> Clone() const override;
+
+    struct Snapshot {
+        uint64_t count = 0;
+        double sum = 0;
+        double sum_squares = 0;
+        double min = 0;
+        double max = 0;
+        std::vector<uint64_t> bucket_counts;
+    };
+
+    Snapshot GetSnapshot() const;
+
+    void MergeFrom(const HistogramImpl& other);
+
+    static HistogramStats ToStats(const Snapshot& s);
+
+ private:
+    static const std::vector<double>& BucketLimits();
+    static size_t BucketIndex(double value, const std::vector<double>& limits);
+    static double EstimatePercentile(const Snapshot& s, double p);
+    static double EstimateStddev(const Snapshot& s);
+
+ private:
+    mutable std::mutex mu_;
+    uint64_t num_ = 0;
+    double sum_ = 0;
+    double sum_squares_ = 0;
+    double min_ = std::numeric_limits<double>::infinity();
+    double max_ = 0;

Review Comment:
   `max_` is initialized to `0`, which produces incorrect results when all 
observed values are negative (max stays `0` even if no non-negative samples 
exist). Initialize `max_` to `-infinity` / 
`std::numeric_limits<double>::lowest()` (and keep the existing `num_ == 0` 
handling in snapshots) so max tracking is correct for negative-only datasets.



##########
src/paimon/common/factories/factory_creator_test.cpp:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/factories/factory_creator.h"
+
+#include <algorithm>
+#include <new>
+
+#include "gtest/gtest.h"
+#include "paimon/factories/factory.h"
+
+namespace paimon::test {
+
+class MockFactory : public Factory {
+ public:
+    explicit MockFactory(const std::string& name) : name_(name) {}
+    const char* Identifier() const override {
+        return "mock";
+    }
+    std::string GetName() const {
+        return name_;
+    }
+
+ private:
+    std::string name_;
+};
+
+class FactoryCreatorTest : public ::testing::Test {
+ protected:
+    void SetUp() override {
+        factory_creator_ = FactoryCreator::GetInstance();
+    }
+
+    void TearDown() override {
+        factory_creator_->~FactoryCreator();
+        new (factory_creator_) FactoryCreator();
+    }

Review Comment:
   Manually calling the destructor and then placement-new’ing a singleton 
instance is undefined behavior for many singleton implementations and can also 
lead to double-destruction at process shutdown. Prefer adding a 
`ResetForTest()`/`Clear()` API on `FactoryCreator` to clear internal 
registries, or change the test to create/destroy a non-singleton 
`FactoryCreator` instance that is owned by the test fixture.



##########
src/paimon/common/metrics/histogram_windowing.cpp:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/metrics/histogram_windowing.h"
+
+#include <chrono>
+
+namespace paimon {
+
+uint64_t HistogramWindowingImpl::NowMicros() {
+    return std::chrono::duration_cast<std::chrono::microseconds>(
+               std::chrono::system_clock::now().time_since_epoch())
+        .count();
+}

Review Comment:
   `system_clock` is not monotonic and can jump forwards/backwards (NTP, manual 
adjustments). Since this component is doing windowing based on elapsed time, a 
monotonic clock (`std::chrono::steady_clock`) is a better fit and avoids 
dropping samples / unexpected resets when wall clock changes.



##########
src/paimon/common/metrics/metrics_impl.cpp:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/metrics/metrics_impl.h"
+
+#include <utility>
+#include <vector>
+
+#include "fmt/format.h"
+#include "paimon/common/metrics/histogram_windowing.h"
+#include "paimon/result.h"
+#include "rapidjson/document.h"
+#include "rapidjson/encodings.h"
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace paimon {
+void MetricsImpl::SetCounter(const std::string& metric_name, uint64_t 
metric_value) {
+    std::lock_guard<std::mutex> guard(counter_lock_);
+    counters_[metric_name] = metric_value;
+}
+
+Result<uint64_t> MetricsImpl::GetCounter(const std::string& metric_name) const 
{
+    std::lock_guard<std::mutex> guard(counter_lock_);
+    auto iter = counters_.find(metric_name);
+    if (iter != counters_.end()) {
+        return iter->second;
+    }
+    return Status::KeyError(fmt::format("metric '{}' not found", metric_name));
+}
+
+std::map<std::string, uint64_t> MetricsImpl::GetAllCounters() const {
+    std::lock_guard<std::mutex> guard(counter_lock_);
+    return counters_;
+}
+
+void MetricsImpl::ObserveHistogram(const std::string& metric_name, double 
value) {
+    std::shared_ptr<Histogram> histogram;
+    {
+        std::lock_guard<std::mutex> guard(histogram_lock_);
+        auto iter = histograms_.find(metric_name);
+        if (iter == histograms_.end()) {
+            histogram = std::make_shared<HistogramWindowingImpl>();
+            histograms_.emplace(metric_name, histogram);
+        } else {
+            histogram = iter->second;
+        }
+    }
+    histogram->Add(value);
+}
+
+Result<HistogramStats> MetricsImpl::GetHistogramStats(const std::string& 
metric_name) const {
+    std::shared_ptr<Histogram> histogram;
+    {
+        std::lock_guard<std::mutex> guard(histogram_lock_);
+        auto iter = histograms_.find(metric_name);
+        if (iter == histograms_.end()) {
+            return Status::KeyError(fmt::format("histogram '{}' not found", 
metric_name));
+        }
+        histogram = iter->second;
+    }
+    return histogram->GetStats();
+}
+
+std::map<std::string, HistogramStats> MetricsImpl::GetAllHistogramStats() 
const {
+    std::vector<std::pair<std::string, std::shared_ptr<Histogram>>> histograms;
+    {
+        std::lock_guard<std::mutex> guard(histogram_lock_);
+        histograms.reserve(histograms_.size());
+        for (const auto& kv : histograms_) {
+            histograms.push_back(kv);
+        }
+    }
+    std::map<std::string, HistogramStats> res;
+    for (const auto& [name, histogram] : histograms) {
+        res.emplace(name, histogram->GetStats());
+    }
+    return res;
+}
+
+void MetricsImpl::SetGauge(const std::string& metric_name, double value) {
+    std::lock_guard<std::mutex> lock(gauge_lock_);
+    gauges_[metric_name] = value;
+}
+
+Result<double> MetricsImpl::GetGauge(const std::string& metric_name) const {
+    std::lock_guard<std::mutex> lock(gauge_lock_);
+    auto it = gauges_.find(metric_name);
+    if (it != gauges_.end()) {
+        return it->second;
+    }
+    return Status::KeyError(fmt::format("metric '{}' not found", metric_name));
+}
+
+std::map<std::string, double> MetricsImpl::GetAllGauges() const {
+    std::lock_guard<std::mutex> lock(gauge_lock_);
+    return gauges_;
+}
+
+void MetricsImpl::Merge(const std::shared_ptr<Metrics>& other) {
+    if (other && this != other.get()) {
+        std::map<std::string, uint64_t> other_counters = 
other->GetAllCounters();
+        {
+            std::lock_guard<std::mutex> guard(counter_lock_);
+            for (const auto& kv : other_counters) {
+                auto iter = counters_.find(kv.first);
+                if (iter == counters_.end()) {
+                    counters_[kv.first] = kv.second;
+                } else {
+                    counters_[kv.first] += kv.second;
+                }
+            }
+        }
+        std::map<std::string, double> other_gauges = other->GetAllGauges();
+        {
+            std::lock_guard<std::mutex> guard(gauge_lock_);
+            for (const auto& kv : other_gauges) {
+                auto iter = gauges_.find(kv.first);
+                if (iter == gauges_.end()) {
+                    gauges_[kv.first] = kv.second;
+                } else {
+                    gauges_[kv.first] += kv.second;
+                }
+            }
+        }
+        auto other_impl = std::dynamic_pointer_cast<MetricsImpl>(other);
+        if (other_impl) {
+            std::vector<std::pair<std::string, std::shared_ptr<Histogram>>> 
other_histograms;
+            {
+                std::lock_guard<std::mutex> guard(other_impl->histogram_lock_);
+                other_histograms.reserve(other_impl->histograms_.size());
+                for (const auto& kv : other_impl->histograms_) {
+                    other_histograms.push_back(kv);
+                }
+            }
+
+            for (const auto& [name, other_hist] : other_histograms) {
+                std::shared_ptr<Histogram> this_hist;
+                {
+                    std::lock_guard<std::mutex> guard(histogram_lock_);
+                    auto iter = histograms_.find(name);
+                    if (iter == histograms_.end()) {
+                        this_hist = std::make_shared<HistogramWindowingImpl>();
+                        histograms_.emplace(name, this_hist);
+                    } else {
+                        this_hist = iter->second;
+                    }
+                }
+                this_hist->Merge(*other_hist);
+            }
+        }
+    }
+}
+
+void MetricsImpl::Overwrite(const std::shared_ptr<Metrics>& other) {
+    if (other && this != other.get()) {
+        std::map<std::string, uint64_t> other_counters = 
other->GetAllCounters();
+        {
+            std::lock_guard<std::mutex> guard(counter_lock_);
+            counters_.swap(other_counters);
+        }
+        std::map<std::string, double> other_gauges = other->GetAllGauges();
+        {
+            std::lock_guard<std::mutex> guard(gauge_lock_);
+            gauges_.swap(other_gauges);
+        }
+
+        auto other_impl = std::dynamic_pointer_cast<MetricsImpl>(other);
+        std::map<std::string, std::shared_ptr<Histogram>> new_histograms;
+        if (other_impl) {
+            std::vector<std::pair<std::string, std::shared_ptr<Histogram>>> 
other_histograms;
+            {
+                std::lock_guard<std::mutex> lock(other_impl->histogram_lock_);
+                other_histograms.reserve(other_impl->histograms_.size());
+                for (const auto& kv : other_impl->histograms_) {
+                    other_histograms.push_back(kv);
+                }
+            }
+            for (const auto& [name, histogram] : other_histograms) {
+                new_histograms.emplace(name, histogram->Clone());
+            }
+        }
+        {
+            std::lock_guard<std::mutex> lock(histogram_lock_);
+            histograms_.swap(new_histograms);
+        }
+    }
+}
+
+std::string MetricsImpl::ToString() const {
+    using RapidWriter =
+        rapidjson::Writer<rapidjson::StringBuffer, rapidjson::UTF8<>, 
rapidjson::ASCII<>>;
+    rapidjson::Document doc;
+    doc.SetObject();
+    rapidjson::Document::AllocatorType& allocator = doc.GetAllocator();
+
+    std::map<std::string, uint64_t> counters = GetAllCounters();
+    for (const auto& kv : counters) {
+        doc.AddMember(rapidjson::Value(kv.first, allocator), 
rapidjson::Value(kv.second),
+                      allocator);

Review Comment:
   These `rapidjson::Value(std::string, allocator)` constructions depend on 
RapidJSON being built with std::string support enabled (e.g., 
`RAPIDJSON_HAS_STDSTRING`). To make this code robust across build 
configurations, construct string keys via `SetString(k.c_str(), 
SizeType(k.size()), allocator)` (or equivalent) rather than relying on 
std::string constructors.



##########
src/paimon/common/metrics/metrics_impl.cpp:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/metrics/metrics_impl.h"
+
+#include <utility>
+#include <vector>
+
+#include "fmt/format.h"
+#include "paimon/common/metrics/histogram_windowing.h"
+#include "paimon/result.h"
+#include "rapidjson/document.h"
+#include "rapidjson/encodings.h"
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace paimon {
+void MetricsImpl::SetCounter(const std::string& metric_name, uint64_t 
metric_value) {
+    std::lock_guard<std::mutex> guard(counter_lock_);
+    counters_[metric_name] = metric_value;
+}
+
+Result<uint64_t> MetricsImpl::GetCounter(const std::string& metric_name) const 
{
+    std::lock_guard<std::mutex> guard(counter_lock_);
+    auto iter = counters_.find(metric_name);
+    if (iter != counters_.end()) {
+        return iter->second;
+    }
+    return Status::KeyError(fmt::format("metric '{}' not found", metric_name));
+}
+
+std::map<std::string, uint64_t> MetricsImpl::GetAllCounters() const {
+    std::lock_guard<std::mutex> guard(counter_lock_);
+    return counters_;
+}
+
+void MetricsImpl::ObserveHistogram(const std::string& metric_name, double 
value) {
+    std::shared_ptr<Histogram> histogram;
+    {
+        std::lock_guard<std::mutex> guard(histogram_lock_);
+        auto iter = histograms_.find(metric_name);
+        if (iter == histograms_.end()) {
+            histogram = std::make_shared<HistogramWindowingImpl>();
+            histograms_.emplace(metric_name, histogram);
+        } else {
+            histogram = iter->second;
+        }
+    }
+    histogram->Add(value);
+}
+
+Result<HistogramStats> MetricsImpl::GetHistogramStats(const std::string& 
metric_name) const {
+    std::shared_ptr<Histogram> histogram;
+    {
+        std::lock_guard<std::mutex> guard(histogram_lock_);
+        auto iter = histograms_.find(metric_name);
+        if (iter == histograms_.end()) {
+            return Status::KeyError(fmt::format("histogram '{}' not found", 
metric_name));
+        }
+        histogram = iter->second;
+    }
+    return histogram->GetStats();
+}
+
+std::map<std::string, HistogramStats> MetricsImpl::GetAllHistogramStats() 
const {
+    std::vector<std::pair<std::string, std::shared_ptr<Histogram>>> histograms;
+    {
+        std::lock_guard<std::mutex> guard(histogram_lock_);
+        histograms.reserve(histograms_.size());
+        for (const auto& kv : histograms_) {
+            histograms.push_back(kv);
+        }
+    }
+    std::map<std::string, HistogramStats> res;
+    for (const auto& [name, histogram] : histograms) {
+        res.emplace(name, histogram->GetStats());
+    }
+    return res;
+}
+
+void MetricsImpl::SetGauge(const std::string& metric_name, double value) {
+    std::lock_guard<std::mutex> lock(gauge_lock_);
+    gauges_[metric_name] = value;
+}
+
+Result<double> MetricsImpl::GetGauge(const std::string& metric_name) const {
+    std::lock_guard<std::mutex> lock(gauge_lock_);
+    auto it = gauges_.find(metric_name);
+    if (it != gauges_.end()) {
+        return it->second;
+    }
+    return Status::KeyError(fmt::format("metric '{}' not found", metric_name));
+}
+
+std::map<std::string, double> MetricsImpl::GetAllGauges() const {
+    std::lock_guard<std::mutex> lock(gauge_lock_);
+    return gauges_;
+}
+
+void MetricsImpl::Merge(const std::shared_ptr<Metrics>& other) {
+    if (other && this != other.get()) {
+        std::map<std::string, uint64_t> other_counters = 
other->GetAllCounters();
+        {
+            std::lock_guard<std::mutex> guard(counter_lock_);
+            for (const auto& kv : other_counters) {
+                auto iter = counters_.find(kv.first);
+                if (iter == counters_.end()) {
+                    counters_[kv.first] = kv.second;
+                } else {
+                    counters_[kv.first] += kv.second;
+                }
+            }
+        }
+        std::map<std::string, double> other_gauges = other->GetAllGauges();
+        {
+            std::lock_guard<std::mutex> guard(gauge_lock_);
+            for (const auto& kv : other_gauges) {
+                auto iter = gauges_.find(kv.first);
+                if (iter == gauges_.end()) {
+                    gauges_[kv.first] = kv.second;
+                } else {
+                    gauges_[kv.first] += kv.second;
+                }
+            }
+        }
+        auto other_impl = std::dynamic_pointer_cast<MetricsImpl>(other);
+        if (other_impl) {
+            std::vector<std::pair<std::string, std::shared_ptr<Histogram>>> 
other_histograms;
+            {
+                std::lock_guard<std::mutex> guard(other_impl->histogram_lock_);
+                other_histograms.reserve(other_impl->histograms_.size());
+                for (const auto& kv : other_impl->histograms_) {
+                    other_histograms.push_back(kv);
+                }
+            }
+
+            for (const auto& [name, other_hist] : other_histograms) {
+                std::shared_ptr<Histogram> this_hist;
+                {
+                    std::lock_guard<std::mutex> guard(histogram_lock_);
+                    auto iter = histograms_.find(name);
+                    if (iter == histograms_.end()) {
+                        this_hist = std::make_shared<HistogramWindowingImpl>();
+                        histograms_.emplace(name, this_hist);
+                    } else {
+                        this_hist = iter->second;
+                    }
+                }
+                this_hist->Merge(*other_hist);
+            }
+        }
+    }
+}
+
+void MetricsImpl::Overwrite(const std::shared_ptr<Metrics>& other) {
+    if (other && this != other.get()) {
+        std::map<std::string, uint64_t> other_counters = 
other->GetAllCounters();
+        {
+            std::lock_guard<std::mutex> guard(counter_lock_);
+            counters_.swap(other_counters);
+        }
+        std::map<std::string, double> other_gauges = other->GetAllGauges();
+        {
+            std::lock_guard<std::mutex> guard(gauge_lock_);
+            gauges_.swap(other_gauges);
+        }
+
+        auto other_impl = std::dynamic_pointer_cast<MetricsImpl>(other);
+        std::map<std::string, std::shared_ptr<Histogram>> new_histograms;
+        if (other_impl) {
+            std::vector<std::pair<std::string, std::shared_ptr<Histogram>>> 
other_histograms;
+            {
+                std::lock_guard<std::mutex> lock(other_impl->histogram_lock_);
+                other_histograms.reserve(other_impl->histograms_.size());
+                for (const auto& kv : other_impl->histograms_) {
+                    other_histograms.push_back(kv);
+                }
+            }
+            for (const auto& [name, histogram] : other_histograms) {
+                new_histograms.emplace(name, histogram->Clone());
+            }
+        }
+        {
+            std::lock_guard<std::mutex> lock(histogram_lock_);
+            histograms_.swap(new_histograms);
+        }
+    }
+}
+
+std::string MetricsImpl::ToString() const {
+    using RapidWriter =
+        rapidjson::Writer<rapidjson::StringBuffer, rapidjson::UTF8<>, 
rapidjson::ASCII<>>;
+    rapidjson::Document doc;
+    doc.SetObject();
+    rapidjson::Document::AllocatorType& allocator = doc.GetAllocator();
+
+    std::map<std::string, uint64_t> counters = GetAllCounters();
+    for (const auto& kv : counters) {
+        doc.AddMember(rapidjson::Value(kv.first, allocator), 
rapidjson::Value(kv.second),
+                      allocator);
+    }
+
+    // Flatten histogram stats into numeric key/value pairs to keep JSON 
schema simple.
+    // Example keys: "read.latency.p99", "read.latency.avg", 
"read.latency.count".
+    std::map<std::string, HistogramStats> hist_stats = GetAllHistogramStats();
+    for (const auto& [name, s] : hist_stats) {
+        doc.AddMember(rapidjson::Value(name + ".count", allocator), 
rapidjson::Value(s.count),
+                      allocator);

Review Comment:
   These `rapidjson::Value(std::string, allocator)` constructions depend on 
RapidJSON being built with std::string support enabled (e.g., 
`RAPIDJSON_HAS_STDSTRING`). To make this code robust across build 
configurations, construct string keys via `SetString(k.c_str(), 
SizeType(k.size()), allocator)` (or equivalent) rather than relying on 
std::string constructors.



##########
src/paimon/common/metrics/metrics_impl_test.cpp:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/metrics/metrics_impl.h"
+
+#include "gtest/gtest.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(MetricsImplTest, TestSimple) {
+    auto metrics = std::make_shared<MetricsImpl>();
+    metrics->SetCounter("some_metric", 100);
+    metrics->SetCounter("some_metric_2", 150);
+    ASSERT_OK_AND_ASSIGN(uint64_t counter, metrics->GetCounter("some_metric"));
+    ASSERT_EQ(100, counter);
+    ASSERT_OK_AND_ASSIGN(counter, metrics->GetCounter("some_metric_2"));
+    ASSERT_EQ(150, counter);
+    auto other = std::make_shared<MetricsImpl>();
+    other->SetCounter("some_metric_2", 200);
+    other->SetCounter("some_metric_3", 300);
+    metrics->Merge(other);
+    ASSERT_OK_AND_ASSIGN(counter, metrics->GetCounter("some_metric"));
+    ASSERT_EQ(100, counter);
+    ASSERT_OK_AND_ASSIGN(counter, metrics->GetCounter("some_metric_2"));
+    ASSERT_EQ(350, counter);
+    ASSERT_OK_AND_ASSIGN(counter, metrics->GetCounter("some_metric_3"));
+    ASSERT_EQ(300, counter);
+    metrics->Overwrite(other);
+    ASSERT_OK_AND_ASSIGN(counter, metrics->GetCounter("some_metric_2"));
+    ASSERT_EQ(200, counter);
+    ASSERT_OK_AND_ASSIGN(counter, metrics->GetCounter("some_metric_3"));
+    ASSERT_EQ(300, counter);
+    // check some_metric is not exist after overwrite

Review Comment:
   Fix grammar in comment: replace 'is not exist' with 'does not exist'.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to