This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 2c6dbe1 feat: add executor future helpers and metrics utilities (#47)
2c6dbe1 is described below
commit 2c6dbe12d6f0a07a25d1ebd50ab512f25496dc91
Author: Yonghao Fang <[email protected]>
AuthorDate: Thu Jun 4 14:31:18 2026 +0800
feat: add executor future helpers and metrics utilities (#47)
---
include/paimon/metrics.h | 87 ++++++
.../common/executor/default_executor_test.cpp | 127 +++++++++
src/paimon/common/executor/future.h | 92 +++++++
.../common/factories/factory_creator_test.cpp | 95 +++++++
src/paimon/common/factories/io_hook_test.cpp | 67 +++++
src/paimon/common/metrics/histogram.cpp | 222 +++++++++++++++
src/paimon/common/metrics/histogram.h | 92 +++++++
src/paimon/common/metrics/histogram_test.cpp | 244 ++++++++++++++++
src/paimon/common/metrics/histogram_windowing.cpp | 306 +++++++++++++++++++++
src/paimon/common/metrics/histogram_windowing.h | 84 ++++++
.../common/metrics/histogram_windowing_test.cpp | 211 ++++++++++++++
src/paimon/common/metrics/metrics_impl.cpp | 279 +++++++++++++++++++
src/paimon/common/metrics/metrics_impl.h | 81 ++++++
src/paimon/common/metrics/metrics_impl_test.cpp | 96 +++++++
14 files changed, 2083 insertions(+)
diff --git a/include/paimon/metrics.h b/include/paimon/metrics.h
new file mode 100644
index 0000000..1db5f5f
--- /dev/null
+++ b/include/paimon/metrics.h
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <string>
+
+#include "paimon/type_fwd.h"
+
+namespace paimon {
+
+/// Statistics snapshot for a histogram metric.
+///
+/// Note: percentile values are estimated from internal buckets.
+struct PAIMON_EXPORT HistogramStats {
+ uint64_t count = 0;
+ double sum = 0;
+ double min = 0;
+ double max = 0;
+ double average = 0;
+ double p50 = 0;
+ double p90 = 0;
+ double p95 = 0;
+ double p99 = 0;
+ double p999 = 0;
+ double stddev = 0;
+};
+
+/// Abstract interface for collecting and managing performance metrics in
Paimon operations.
+class PAIMON_EXPORT Metrics {
+ public:
+ virtual ~Metrics() = default;
+
+ /// Set the value of a specific counter metric.
+ virtual void SetCounter(const std::string& metric_name, uint64_t
metric_value) = 0;
+
+ /// Get the current value of a specific counter metric.
+ virtual Result<uint64_t> GetCounter(const std::string& metric_name) const
= 0;
+
+ /// Get all counter metrics as a map.
+ virtual std::map<std::string, uint64_t> GetAllCounters() const = 0;
+
+ /// Add a sample to a histogram metric.
+ virtual void ObserveHistogram(const std::string& metric_name, double
value) = 0;
+
+ /// Get histogram statistics snapshot.
+ virtual Result<HistogramStats> GetHistogramStats(const std::string&
metric_name) const = 0;
+
+ /// Get all histogram statistics snapshots.
+ virtual std::map<std::string, HistogramStats> GetAllHistogramStats() const
= 0;
+
+ /// Set the value of a specific gauge metric.
+ virtual void SetGauge(const std::string& metric_name, double metric_value)
= 0;
+
+ /// Get the current value of a specific gauge metric.
+ virtual Result<double> GetGauge(const std::string& metric_name) const = 0;
+
+ /// Get all gauge metrics as a map.
+ virtual std::map<std::string, double> GetAllGauges() const = 0;
+
+ /// Merge metrics from another Metrics instance into this one.
+ virtual void Merge(const std::shared_ptr<Metrics>& other) = 0;
+
+ /// Convert all metrics to a JSON string representation.
+ virtual std::string ToString() const = 0;
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/executor/default_executor_test.cpp
b/src/paimon/common/executor/default_executor_test.cpp
new file mode 100644
index 0000000..26d84f8
--- /dev/null
+++ b/src/paimon/common/executor/default_executor_test.cpp
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <atomic>
+#include <chrono>
+#include <cstdint>
+#include <future>
+#include <memory>
+#include <stdexcept>
+#include <thread>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/common/executor/future.h"
+#include "paimon/executor.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon::test {
+
+TEST(DefaultExecutorTest, TestViaVoidFunc) {
+ auto executor = GetGlobalDefaultExecutor();
+ std::atomic<int64_t> sum = {0};
+ std::vector<std::future<void>> futures;
+ for (int32_t index = 0; index < 10; ++index) {
+ futures.push_back(Via(executor.get(), [&sum]() { sum++; }));
+ }
+ Wait(futures);
+ ASSERT_EQ(10, sum.load());
+}
+
+TEST(DefaultExecutorTest, TestVia) {
+ auto executor = GetGlobalDefaultExecutor();
+ std::atomic<int64_t> sum = {0};
+ std::vector<std::future<int>> futures;
+ for (int32_t index = 0; index < 10; ++index) {
+ futures.push_back(Via(executor.get(), [index, &sum]() -> int32_t {
+ sum++;
+ return index * 2;
+ }));
+ }
+ auto results = CollectAll(futures);
+ ASSERT_EQ(10, results.size());
+ std::vector<int> expected = {0, 2, 4, 6, 8, 10, 12, 14, 16, 18};
+ ASSERT_EQ(expected, results);
+ ASSERT_EQ(10, sum.load());
+}
+
+TEST(DefaultExecutorTest, TestViaWithResult) {
+ auto executor = GetGlobalDefaultExecutor();
+ std::vector<std::future<Result<std::vector<int32_t>>>> futures;
+ std::vector<int32_t> inputs = {-2, -1, 1, 2};
+ for (const auto& input : inputs) {
+ futures.push_back(Via(executor.get(), [input]() ->
Result<std::vector<int32_t>> {
+ if (input > 0) {
+ std::vector<int32_t> output = {-2, -1, 1, 2};
+ return output;
+ }
+ return Status::Invalid("negative");
+ }));
+ }
+ auto results = CollectAll(futures);
+ ASSERT_EQ(4, results.size());
+}
+
+TEST(DefaultExecutorTest, TestViaWithException) {
+ auto executor = GetGlobalDefaultExecutor();
+ auto future = Via(executor.get(), []() { throw std::runtime_error("test");
});
+ ASSERT_THROW(future.get(), std::runtime_error);
+}
+
+TEST(DefaultExecutorTest, TestShutdownNowDropsPendingTasks) {
+ auto executor = CreateDefaultExecutor(/*thread_count=*/1);
+ std::atomic<bool> first_started = false;
+ std::atomic<int32_t> executed_count = 0;
+ std::promise<void> release_first_task;
+ auto release_future = release_first_task.get_future();
+ executor->Add([&]() {
+ first_started.store(true);
+ release_future.wait();
+ ++executed_count;
+ });
+
+ for (int32_t index = 0; index < 20; ++index) {
+ executor->Add([&]() { ++executed_count; });
+ }
+
+ for (int32_t retry = 0; retry < 100 && !first_started.load(); ++retry) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+ ASSERT_TRUE(first_started.load());
+ std::thread shutdown_thread([&]() { executor->ShutdownNow(); });
+ std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+ release_first_task.set_value();
+ shutdown_thread.join();
+
+ ASSERT_EQ(executed_count.load(), 1);
+}
+
+TEST(DefaultExecutorTest, TestAddTaskAfterShutdownNowIgnored) {
+ auto executor = CreateDefaultExecutor(/*thread_count=*/1);
+ std::atomic<int32_t> executed_count = 0;
+
+ executor->ShutdownNow();
+ executor->Add([&]() { ++executed_count; });
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ ASSERT_EQ(executed_count.load(), 0);
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/executor/future.h
b/src/paimon/common/executor/future.h
new file mode 100644
index 0000000..91f8d70
--- /dev/null
+++ b/src/paimon/common/executor/future.h
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <functional>
+#include <future>
+#include <memory>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include "paimon/executor.h"
+
+namespace paimon {
+
+/// Submits a function to be executed asynchronously on a given executor and
returns a future.
+///
+/// This function wraps the provided callable and submits it to the provided
executor. The function
+/// captures the result using a std::promise, which is used to fulfill the
returned std::future. If
+/// the callable throws an exception, the exception is captured and set in the
promise.
+template <typename Func>
+auto Via(Executor* executor, Func&& func) ->
std::future<std::invoke_result_t<Func>> {
+ using ResultType = std::invoke_result_t<Func>;
+
+ static_assert(std::is_invocable_v<Func>, "func must be callable");
+
+ if constexpr (std::is_constructible_v<std::function<void()>, Func>) {
+ std::function<void()> test_func = func;
+ if (!test_func) {
+ assert(false && "func cannot be an empty std::function");
+ }
+ }
+
+ auto promise = std::make_shared<std::promise<ResultType>>();
+ auto future = promise->get_future();
+
+ executor->Add([promise, func = std::forward<Func>(func)]() mutable {
+ try {
+ if constexpr (std::is_void_v<ResultType>) {
+ func();
+ promise->set_value();
+ } else {
+ promise->set_value(func());
+ }
+ } catch (...) {
+ promise->set_exception(std::current_exception());
+ }
+ });
+
+ return future;
+}
+
+/// Collects the results of multiple futures.
+template <typename T>
+std::vector<T> CollectAll(std::vector<std::future<T>>& futures) {
+ std::vector<T> results;
+ results.reserve(futures.size());
+ for (auto& future : futures) {
+ results.push_back(future.get());
+ }
+
+ return results;
+}
+
+/// Waits for all futures with void return type to complete.
+inline void Wait(std::vector<std::future<void>>& futures) {
+ for (auto& future : futures) {
+ if (future.valid()) {
+ future.get();
+ }
+ }
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/factories/factory_creator_test.cpp
b/src/paimon/common/factories/factory_creator_test.cpp
new file mode 100644
index 0000000..96ed1dd
--- /dev/null
+++ b/src/paimon/common/factories/factory_creator_test.cpp
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/factories/factory_creator.h"
+
+#include <algorithm>
+#include <new>
+
+#include "gtest/gtest.h"
+#include "paimon/factories/factory.h"
+
+namespace paimon::test {
+
+class MockFactory : public Factory {
+ public:
+ explicit MockFactory(const std::string& name) : name_(name) {}
+ const char* Identifier() const override {
+ return "mock";
+ }
+ std::string GetName() const {
+ return name_;
+ }
+
+ private:
+ std::string name_;
+};
+
+class FactoryCreatorTest : public ::testing::Test {
+ protected:
+ void SetUp() override {
+ factory_creator_ = FactoryCreator::GetInstance();
+ }
+
+ void TearDown() override {
+ factory_creator_->~FactoryCreator();
+ new (factory_creator_) FactoryCreator();
+ }
+
+ FactoryCreator* factory_creator_;
+};
+
+TEST_F(FactoryCreatorTest, RegisterAndCreateFactory) {
+ auto factory1 = new MockFactory("Factory1");
+ auto factory2 = new MockFactory("Factory2");
+
+ factory_creator_->Register("type1", factory1);
+ factory_creator_->Register("type2", factory2);
+
+ Factory* created_factory1 = factory_creator_->Create("type1");
+ Factory* created_factory2 = factory_creator_->Create("type2");
+
+ ASSERT_NE(created_factory1, nullptr);
+ ASSERT_NE(created_factory2, nullptr);
+
+ EXPECT_EQ(static_cast<MockFactory*>(created_factory1)->GetName(),
"Factory1");
+ EXPECT_EQ(static_cast<MockFactory*>(created_factory2)->GetName(),
"Factory2");
+}
+
+TEST_F(FactoryCreatorTest, GetRegisteredType) {
+ auto factory1 = new MockFactory("Factory1");
+ auto factory2 = new MockFactory("Factory2");
+
+ factory_creator_->Register("type1", factory1);
+ factory_creator_->Register("type2", factory2);
+
+ std::vector<std::string> registered_types =
factory_creator_->GetRegisteredType();
+ EXPECT_EQ(registered_types.size(), 2);
+ EXPECT_NE(std::find(registered_types.begin(), registered_types.end(),
"type1"),
+ registered_types.end());
+ EXPECT_NE(std::find(registered_types.begin(), registered_types.end(),
"type2"),
+ registered_types.end());
+}
+
+TEST_F(FactoryCreatorTest, CreateNonExistentFactory) {
+ Factory* created_factory = factory_creator_->Create("nonexistent");
+ EXPECT_EQ(created_factory, nullptr);
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/factories/io_hook_test.cpp
b/src/paimon/common/factories/io_hook_test.cpp
new file mode 100644
index 0000000..9bbb1b3
--- /dev/null
+++ b/src/paimon/common/factories/io_hook_test.cpp
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/factories/io_hook.h"
+
+#include <stdexcept>
+
+#include "gtest/gtest.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(IOHookTest, TestReturnErrorMode) {
+ auto hook = IOHook::GetInstance();
+ hook->Reset(0, IOHook::Mode::RETURN_ERROR);
+ ASSERT_NOK(hook->Try("path"));
+ ASSERT_NOK(hook->Try("path"));
+ ASSERT_EQ(2, hook->IOCount());
+ hook->Clear();
+}
+
+TEST(IOHookTest, TestSilentMode) {
+ auto hook = IOHook::GetInstance();
+ hook->Reset(0, IOHook::Mode::SILENT);
+ ASSERT_OK(hook->Try("path"));
+ ASSERT_OK(hook->Try("path"));
+ ASSERT_EQ(2, hook->IOCount());
+ hook->Clear();
+}
+
+TEST(IOHookTest, TestSingleton) {
+ auto hook = IOHook::GetInstance();
+ auto hook2 = IOHook::GetInstance();
+ ASSERT_EQ(hook, hook2);
+ hook->Clear();
+}
+
+TEST(IOHookTest, TestThrowExceptionMode) {
+ auto hook = IOHook::GetInstance();
+ hook->Reset(0, IOHook::Mode::THROW_EXCEPTION);
+ auto Try = [hook]() {
+ auto s = hook->Try("path");
+ (void)s;
+ };
+ EXPECT_THROW(Try(), std::runtime_error);
+ EXPECT_THROW(Try(), std::runtime_error);
+ ASSERT_EQ(2, hook->IOCount());
+ hook->Clear();
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/metrics/histogram.cpp
b/src/paimon/common/metrics/histogram.cpp
new file mode 100644
index 0000000..73ccbae
--- /dev/null
+++ b/src/paimon/common/metrics/histogram.cpp
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/metrics/histogram.h"
+
+#include <algorithm>
+#include <cmath>
+
+namespace paimon {
+
+HistogramImpl::HistogramImpl() : bucket_counts_(BucketLimits().size(), 0) {}
+
+void HistogramImpl::Add(double value) {
+ if (std::isnan(value)) {
+ return;
+ }
+ std::lock_guard<std::mutex> guard(mu_);
+ const auto& limits = BucketLimits();
+ size_t idx = BucketIndex(value, limits);
+ bucket_counts_[idx] += 1;
+ num_ += 1;
+ sum_ += value;
+ sum_squares_ += value * value;
+ if (value < min_) {
+ min_ = value;
+ }
+ if (value > max_) {
+ max_ = value;
+ }
+}
+
+HistogramStats HistogramImpl::GetStats() const {
+ return ToStats(GetSnapshot());
+}
+
+HistogramImpl::Snapshot HistogramImpl::GetSnapshot() const {
+ std::lock_guard<std::mutex> guard(mu_);
+ Snapshot s;
+ s.count = num_;
+ s.sum = sum_;
+ s.sum_squares = sum_squares_;
+ s.min = (num_ == 0) ? 0 : min_;
+ s.max = (num_ == 0) ? 0 : max_;
+ s.bucket_counts = bucket_counts_;
+ return s;
+}
+
+void HistogramImpl::Merge(const Histogram& other) {
+ const auto* other_impl = dynamic_cast<const HistogramImpl*>(&other);
+ if (!other_impl) {
+ return;
+ }
+ MergeFrom(*other_impl);
+}
+
+void HistogramImpl::MergeFrom(const HistogramImpl& other) {
+ Snapshot other_snapshot = other.GetSnapshot();
+ std::lock_guard<std::mutex> guard(mu_);
+ if (bucket_counts_.size() != other_snapshot.bucket_counts.size()) {
+ // Should never happen unless bucket config changes.
+ return;
+ }
+ if (other_snapshot.count == 0) {
+ return;
+ }
+ for (size_t i = 0; i < bucket_counts_.size(); ++i) {
+ bucket_counts_[i] += other_snapshot.bucket_counts[i];
+ }
+ if (num_ == 0) {
+ min_ = other_snapshot.min;
+ max_ = other_snapshot.max;
+ } else {
+ min_ = std::min(min_, other_snapshot.min);
+ max_ = std::max(max_, other_snapshot.max);
+ }
+ num_ += other_snapshot.count;
+ sum_ += other_snapshot.sum;
+ sum_squares_ += other_snapshot.sum_squares;
+}
+
+std::shared_ptr<Histogram> HistogramImpl::Clone() const {
+ Snapshot s = GetSnapshot();
+ auto cloned = std::make_shared<HistogramImpl>();
+ std::lock_guard<std::mutex> guard(cloned->mu_);
+ cloned->num_ = s.count;
+ cloned->sum_ = s.sum;
+ cloned->sum_squares_ = s.sum_squares;
+ cloned->min_ = (s.count == 0) ? std::numeric_limits<double>::infinity() :
s.min;
+ cloned->max_ = s.max;
+ cloned->bucket_counts_ = std::move(s.bucket_counts);
+ return cloned;
+}
+
+HistogramStats HistogramImpl::ToStats(const Snapshot& s) {
+ HistogramStats stats;
+ stats.count = s.count;
+ stats.sum = s.sum;
+ stats.min = s.min;
+ stats.max = s.max;
+ stats.average = (s.count == 0) ? 0 : (s.sum /
static_cast<double>(s.count));
+ stats.p50 = EstimatePercentile(s, 0.50);
+ stats.p90 = EstimatePercentile(s, 0.90);
+ stats.p95 = EstimatePercentile(s, 0.95);
+ stats.p99 = EstimatePercentile(s, 0.99);
+ stats.p999 = EstimatePercentile(s, 0.999);
+ stats.stddev = EstimateStddev(s);
+ return stats;
+}
+
+const std::vector<double>& HistogramImpl::BucketLimits() {
+ static const std::vector<double> limits = []() {
+ // Keep bucket generation aligned with RocksDB's HistogramBucketMapper
+ // (see rocksdb/monitoring/histogram.cc).
+ //
+ // The mapper starts from {1,2}, then grows by *1.5 and rounds each
bucket boundary to
+ // be human-readable by keeping the two most significant digits.
+ std::vector<double> v;
+ v.reserve(256);
+ v.push_back(1);
+ v.push_back(2);
+
+ double bucket_val = v.back();
+ const auto max_u64 =
static_cast<double>(std::numeric_limits<uint64_t>::max());
+ while ((bucket_val = 1.5 * bucket_val) <= max_u64) {
+ auto rounded = static_cast<uint64_t>(bucket_val);
+ // Keep two most significant digits (e.g., 172 -> 170).
+ uint64_t pow_of_ten = 1;
+ while (rounded / 10 > 10) {
+ rounded /= 10;
+ pow_of_ten *= 10;
+ }
+ rounded *= pow_of_ten;
+ v.push_back(static_cast<double>(rounded));
+ }
+ return v;
+ }();
+ return limits;
+}
+
+size_t HistogramImpl::BucketIndex(double value, const std::vector<double>&
limits) {
+ if (value <= limits.front()) {
+ return 0;
+ }
+ auto it = std::lower_bound(limits.begin(), limits.end(), value);
+ if (it == limits.end()) {
+ return limits.size() - 1;
+ }
+ return static_cast<size_t>(std::distance(limits.begin(), it));
+}
+
+double HistogramImpl::EstimatePercentile(const Snapshot& s, double p) {
+ if (s.count == 0) {
+ return 0;
+ }
+ if (p <= 0) {
+ return s.min;
+ }
+ if (p >= 1) {
+ return s.max;
+ }
+ const auto& limits = BucketLimits();
+ const auto rank = static_cast<uint64_t>(std::ceil(p *
static_cast<double>(s.count)));
+ uint64_t cum = 0;
+ for (size_t i = 0; i < s.bucket_counts.size(); ++i) {
+ uint64_t bcnt = s.bucket_counts[i];
+ if (bcnt == 0) {
+ continue;
+ }
+ uint64_t prev = cum;
+ cum += bcnt;
+ if (cum < rank) {
+ continue;
+ }
+
+ // Interpolate inside bucket: estimate that samples are uniformly
distributed
+ // between bucket lower and upper bounds.
+ double upper = (i < limits.size()) ? limits[i] : s.max;
+ double lower = (i == 0) ? std::min(s.min, 0.0) : limits[i - 1];
+ if (std::isinf(upper)) {
+ return s.max;
+ }
+ const double pos_in_bucket = static_cast<double>(rank - prev) /
static_cast<double>(bcnt);
+ double est = lower + (upper - lower) * pos_in_bucket;
+ if (est < s.min) {
+ est = s.min;
+ }
+ if (est > s.max) {
+ est = s.max;
+ }
+ return est;
+ }
+ return s.max;
+}
+
+double HistogramImpl::EstimateStddev(const Snapshot& s) {
+ if (s.count == 0) {
+ return 0;
+ }
+ const auto n = static_cast<double>(s.count);
+ const double mean = s.sum / n;
+ const double ex2 = s.sum_squares / n;
+ const double var = std::max(0.0, ex2 - mean * mean);
+ return std::sqrt(var);
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/metrics/histogram.h
b/src/paimon/common/metrics/histogram.h
new file mode 100644
index 0000000..aec6178
--- /dev/null
+++ b/src/paimon/common/metrics/histogram.h
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <limits>
+#include <memory>
+#include <mutex>
+#include <vector>
+
+#include "paimon/metrics.h"
+
+namespace paimon {
+
+// Internal histogram interface used by MetricsImpl.
+//
+// Note: this interface is intentionally NOT exported.
+class Histogram {
+ public:
+ virtual ~Histogram() = default;
+
+ virtual void Add(double value) = 0;
+ virtual HistogramStats GetStats() const = 0;
+ virtual void Merge(const Histogram& other) = 0;
+ virtual std::shared_ptr<Histogram> Clone() const = 0;
+};
+
+// Internal histogram implementation.
+//
+// The bucket boundaries are aligned with RocksDB's HistogramBucketMapper
+// (rocksdb/monitoring/histogram.cc).
+class HistogramImpl : public Histogram {
+ public:
+ HistogramImpl();
+
+ void Add(double value) override;
+
+ HistogramStats GetStats() const override;
+
+ void Merge(const Histogram& other) override;
+
+ std::shared_ptr<Histogram> Clone() const override;
+
+ struct Snapshot {
+ uint64_t count = 0;
+ double sum = 0;
+ double sum_squares = 0;
+ double min = 0;
+ double max = 0;
+ std::vector<uint64_t> bucket_counts;
+ };
+
+ Snapshot GetSnapshot() const;
+
+ void MergeFrom(const HistogramImpl& other);
+
+ static HistogramStats ToStats(const Snapshot& s);
+
+ private:
+ static const std::vector<double>& BucketLimits();
+ static size_t BucketIndex(double value, const std::vector<double>& limits);
+ static double EstimatePercentile(const Snapshot& s, double p);
+ static double EstimateStddev(const Snapshot& s);
+
+ private:
+ mutable std::mutex mu_;
+ uint64_t num_ = 0;
+ double sum_ = 0;
+ double sum_squares_ = 0;
+ double min_ = std::numeric_limits<double>::infinity();
+ double max_ = 0;
+ std::vector<uint64_t> bucket_counts_;
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/metrics/histogram_test.cpp
b/src/paimon/common/metrics/histogram_test.cpp
new file mode 100644
index 0000000..2833480
--- /dev/null
+++ b/src/paimon/common/metrics/histogram_test.cpp
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/metrics/histogram.h"
+
+#include <algorithm>
+#include <cmath>
+#include <limits>
+#include <random>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/common/metrics/metrics_impl.h"
+#include "paimon/testing/utils/testharness.h"
+#include "rapidjson/document.h"
+
+namespace paimon::test {
+
+TEST(HistogramImplTest, TestBasicStats) {
+ HistogramImpl h;
+ h.Add(1);
+ h.Add(2);
+ h.Add(3);
+
+ HistogramStats s = h.GetStats();
+ EXPECT_EQ(s.count, 3);
+ EXPECT_DOUBLE_EQ(s.min, 1);
+ EXPECT_DOUBLE_EQ(s.max, 3);
+ EXPECT_NEAR(s.average, 2.0, 1e-12);
+ EXPECT_NEAR(s.stddev, std::sqrt(2.0 / 3.0), 1e-12);
+ EXPECT_DOUBLE_EQ(s.p50, 2);
+ EXPECT_DOUBLE_EQ(s.p90, 3);
+ EXPECT_DOUBLE_EQ(s.p95, 3);
+ EXPECT_DOUBLE_EQ(s.p99, 3);
+ EXPECT_DOUBLE_EQ(s.p999, 3);
+}
+
+TEST(HistogramImplTest, TestLargeDatasetExactMoments) {
+ HistogramImpl h;
+
+ std::mt19937 rng(12345);
+ std::uniform_int_distribution<int> dist(-100, 5000);
+
+ const int32_t n = 20000;
+ double sum = 0;
+ double sum_squares = 0;
+ double minv = std::numeric_limits<double>::infinity();
+ double maxv = -std::numeric_limits<double>::infinity();
+
+ for (int32_t i = 0; i < n; ++i) {
+ const auto v = static_cast<double>(dist(rng));
+ h.Add(v);
+ sum += v;
+ sum_squares += v * v;
+ minv = std::min(minv, v);
+ maxv = std::max(maxv, v);
+ }
+
+ HistogramStats s = h.GetStats();
+ EXPECT_EQ(s.count, static_cast<uint64_t>(n));
+ EXPECT_DOUBLE_EQ(s.sum, sum);
+ EXPECT_DOUBLE_EQ(s.min, minv);
+ EXPECT_DOUBLE_EQ(s.max, maxv);
+
+ const double mean = sum / static_cast<double>(n);
+ const double ex2 = sum_squares / static_cast<double>(n);
+ const double var = std::max(0.0, ex2 - mean * mean);
+ const double stddev = std::sqrt(var);
+ EXPECT_DOUBLE_EQ(s.average, mean);
+ EXPECT_DOUBLE_EQ(s.stddev, stddev);
+
+ // Percentiles should be inside [min, max] and monotonically increasing.
+ EXPECT_GE(s.p50, s.min);
+ EXPECT_LE(s.p50, s.max);
+ EXPECT_GE(s.p90, s.p50);
+ EXPECT_GE(s.p95, s.p90);
+ EXPECT_GE(s.p99, s.p95);
+ EXPECT_GE(s.p999, s.p99);
+ EXPECT_LE(s.p999, s.max);
+}
+
+TEST(HistogramImplTest, TestMergeMatchesSingleHistogram) {
+ std::mt19937 rng(7);
+ std::uniform_int_distribution<int> dist(0, 100000);
+ const int32_t n = 50000;
+
+ std::vector<double> values;
+ values.reserve(n);
+ for (int32_t i = 0; i < n; ++i) {
+ values.push_back(static_cast<double>(dist(rng)));
+ }
+
+ HistogramImpl left;
+ HistogramImpl right;
+ HistogramImpl all;
+
+ for (int32_t i = 0; i < n; ++i) {
+ all.Add(values[i]);
+ if (i % 2 == 0) {
+ left.Add(values[i]);
+ } else {
+ right.Add(values[i]);
+ }
+ }
+ left.Merge(right);
+
+ HistogramStats merged = left.GetStats();
+ HistogramStats single = all.GetStats();
+
+ EXPECT_EQ(merged.count, single.count);
+ EXPECT_DOUBLE_EQ(merged.sum, single.sum);
+ EXPECT_DOUBLE_EQ(merged.min, single.min);
+ EXPECT_DOUBLE_EQ(merged.max, single.max);
+ EXPECT_DOUBLE_EQ(merged.average, single.average);
+ EXPECT_DOUBLE_EQ(merged.stddev, single.stddev);
+ EXPECT_DOUBLE_EQ(merged.p50, single.p50);
+ EXPECT_DOUBLE_EQ(merged.p90, single.p90);
+ EXPECT_DOUBLE_EQ(merged.p95, single.p95);
+ EXPECT_DOUBLE_EQ(merged.p99, single.p99);
+ EXPECT_DOUBLE_EQ(merged.p999, single.p999);
+}
+
+TEST(HistogramImplTest, TestCloneConsistencyAndIndependence) {
+ HistogramImpl h;
+ // Include a mix of values across buckets; NaN should be ignored.
+ const std::vector<double> values = {1, 2, 3, 10,
+ 100, 0.5, -5,
std::numeric_limits<double>::quiet_NaN()};
+ for (double v : values) {
+ h.Add(v);
+ }
+
+ auto cloned_base = h.Clone();
+ auto cloned = std::dynamic_pointer_cast<HistogramImpl>(cloned_base);
+ ASSERT_TRUE(cloned != nullptr);
+
+ // Capture original state after Clone() for easier post-mortem inspection.
+ // Clone() should not mutate the source.
+ const auto after_snapshot = h.GetSnapshot();
+ const auto after_stats = h.GetStats();
+
+ const auto cloned_snapshot = cloned->GetSnapshot();
+ EXPECT_EQ(cloned_snapshot.count, after_snapshot.count);
+ EXPECT_DOUBLE_EQ(cloned_snapshot.sum, after_snapshot.sum);
+ EXPECT_DOUBLE_EQ(cloned_snapshot.sum_squares, after_snapshot.sum_squares);
+ EXPECT_DOUBLE_EQ(cloned_snapshot.min, after_snapshot.min);
+ EXPECT_DOUBLE_EQ(cloned_snapshot.max, after_snapshot.max);
+ EXPECT_EQ(cloned_snapshot.bucket_counts, after_snapshot.bucket_counts);
+
+ const auto cloned_stats = cloned->GetStats();
+ EXPECT_EQ(cloned_stats.count, after_stats.count);
+ EXPECT_DOUBLE_EQ(cloned_stats.sum, after_stats.sum);
+ EXPECT_DOUBLE_EQ(cloned_stats.min, after_stats.min);
+ EXPECT_DOUBLE_EQ(cloned_stats.max, after_stats.max);
+ EXPECT_DOUBLE_EQ(cloned_stats.average, after_stats.average);
+ EXPECT_DOUBLE_EQ(cloned_stats.stddev, after_stats.stddev);
+ EXPECT_DOUBLE_EQ(cloned_stats.p50, after_stats.p50);
+ EXPECT_DOUBLE_EQ(cloned_stats.p90, after_stats.p90);
+ EXPECT_DOUBLE_EQ(cloned_stats.p95, after_stats.p95);
+ EXPECT_DOUBLE_EQ(cloned_stats.p99, after_stats.p99);
+ EXPECT_DOUBLE_EQ(cloned_stats.p999, after_stats.p999);
+
+ // Mutating original should not affect cloned.
+ h.Add(42);
+ EXPECT_EQ(cloned->GetSnapshot().count, after_snapshot.count);
+ EXPECT_EQ(h.GetSnapshot().count, after_snapshot.count + 1);
+
+ // Mutating cloned should not affect original.
+ cloned->Add(84);
+ EXPECT_EQ(cloned->GetSnapshot().count, after_snapshot.count + 1);
+ EXPECT_EQ(h.GetSnapshot().count, after_snapshot.count + 1);
+}
+
+TEST(MetricsImplHistogramTest, TestMergeAndOverwrite) {
+ auto metrics1 = std::make_shared<MetricsImpl>();
+ metrics1->ObserveHistogram("h", 1);
+ metrics1->ObserveHistogram("h", 2);
+
+ auto metrics2 = std::make_shared<MetricsImpl>();
+ metrics2->ObserveHistogram("h", 3);
+ metrics2->ObserveHistogram("h", 4);
+
+ metrics1->Merge(metrics2);
+
+ ASSERT_OK_AND_ASSIGN(HistogramStats s, metrics1->GetHistogramStats("h"));
+ EXPECT_EQ(s.count, 4);
+ EXPECT_DOUBLE_EQ(s.min, 1);
+ EXPECT_DOUBLE_EQ(s.max, 4);
+ EXPECT_NEAR(s.average, 2.5, 1e-12);
+ EXPECT_DOUBLE_EQ(s.p50, 2);
+ EXPECT_DOUBLE_EQ(s.p99, 4);
+
+ auto metrics3 = std::make_shared<MetricsImpl>();
+ metrics3->ObserveHistogram("h2", 100);
+ metrics1->Overwrite(metrics3);
+ ASSERT_NOK_WITH_MSG(metrics1->GetHistogramStats("h"), "Key error:
histogram 'h' not found");
+ ASSERT_OK_AND_ASSIGN(HistogramStats s2, metrics1->GetHistogramStats("h2"));
+ EXPECT_EQ(s2.count, 1);
+ EXPECT_DOUBLE_EQ(s2.min, 100);
+ EXPECT_DOUBLE_EQ(s2.max, 100);
+ EXPECT_DOUBLE_EQ(s2.stddev, 0);
+}
+
+TEST(MetricsImplHistogramTest, TestToStringWithHistogram) {
+ auto metrics = std::make_shared<MetricsImpl>();
+ metrics->SetCounter("k1", 1);
+ metrics->ObserveHistogram("h", 1);
+ metrics->ObserveHistogram("h", 2);
+
+ rapidjson::Document doc;
+ doc.Parse(metrics->ToString().c_str());
+ ASSERT_TRUE(doc.IsObject());
+ ASSERT_TRUE(doc.HasMember("k1"));
+ EXPECT_EQ(doc["k1"].GetUint64(), 1);
+
+ ASSERT_TRUE(doc.HasMember("h.count"));
+ EXPECT_EQ(doc["h.count"].GetUint64(), 2);
+ ASSERT_TRUE(doc.HasMember("h.min"));
+ EXPECT_TRUE(doc["h.min"].IsNumber());
+ ASSERT_TRUE(doc.HasMember("h.p99"));
+ EXPECT_TRUE(doc["h.p99"].IsNumber());
+
+ ASSERT_TRUE(doc.HasMember("h.p99.9"));
+ EXPECT_TRUE(doc["h.p99.9"].IsNumber());
+ ASSERT_TRUE(doc.HasMember("h.stddev"));
+ EXPECT_TRUE(doc["h.stddev"].IsNumber());
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/metrics/histogram_windowing.cpp
b/src/paimon/common/metrics/histogram_windowing.cpp
new file mode 100644
index 0000000..9b0565f
--- /dev/null
+++ b/src/paimon/common/metrics/histogram_windowing.cpp
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/metrics/histogram_windowing.h"
+
+#include <chrono>
+
+namespace paimon {
+
+uint64_t HistogramWindowingImpl::NowMicros() {
+ return std::chrono::duration_cast<std::chrono::microseconds>(
+ std::chrono::system_clock::now().time_since_epoch())
+ .count();
+}
+
+uint64_t HistogramWindowingImpl::AlignWindowStartMicros(uint64_t micros) const
{
+ if (micros_per_window_ == 0) {
+ return micros;
+ }
+ return micros - (micros % micros_per_window_);
+}
+
+void HistogramWindowingImpl::Init() {
+ std::lock_guard<std::mutex> guard(mu_);
+ window_start_micros_.assign(num_windows_, std::nullopt);
+ histograms_.assign(num_windows_, nullptr);
+ const uint64_t now = AlignWindowStartMicros(NowMicros());
+ ResetLocked(now);
+}
+
+void HistogramWindowingImpl::ResetLocked(uint64_t aligned_now) {
+ if (num_windows_ == 0) {
+ current_index_ = 0;
+ current_window_start_micros_ = aligned_now;
+ current_window_num_ = 0;
+ return;
+ }
+ current_index_ = 0;
+ current_window_start_micros_ = aligned_now;
+ current_window_num_ = 0;
+ for (size_t i = 0; i < num_windows_; ++i) {
+ window_start_micros_[i] = std::nullopt;
+ histograms_[i].reset();
+ }
+ window_start_micros_[current_index_] = current_window_start_micros_;
+ histograms_[current_index_] = std::make_shared<HistogramImpl>();
+}
+
+void HistogramWindowingImpl::AdvanceLocked(uint64_t aligned_now) {
+ if (micros_per_window_ == 0 || num_windows_ == 0) {
+ return;
+ }
+
+ if (aligned_now <= current_window_start_micros_) {
+ return;
+ }
+
+ const uint64_t max_span = micros_per_window_ * num_windows_;
+ if (aligned_now - current_window_start_micros_ >= max_span) {
+ ResetLocked(aligned_now);
+ return;
+ }
+
+ // Advance at most num_windows_ steps, but only when current window has
enough samples.
+ while (aligned_now > current_window_start_micros_ &&
+ (aligned_now - current_window_start_micros_) >= micros_per_window_
&&
+ current_window_num_ >= min_num_per_window_) {
+ current_window_start_micros_ += micros_per_window_;
+ current_index_ = (current_index_ + 1) %
static_cast<size_t>(num_windows_);
+ current_window_num_ = 0;
+ window_start_micros_[current_index_] = current_window_start_micros_;
+ histograms_[current_index_] = std::make_shared<HistogramImpl>();
+ }
+}
+
+size_t HistogramWindowingImpl::IndexForStartLocked(uint64_t aligned_start)
const {
+ // Find the slot with the same start time.
+ for (size_t i = 0; i < window_start_micros_.size(); ++i) {
+ if (window_start_micros_[i].has_value() &&
+ window_start_micros_[i].value() == aligned_start) {
+ return i;
+ }
+ }
+ // Not found; map by offset from current window.
+ if (micros_per_window_ == 0) {
+ return current_index_;
+ }
+ if (aligned_start > current_window_start_micros_) {
+ return current_index_;
+ }
+ const uint64_t delta = current_window_start_micros_ - aligned_start;
+ const uint64_t steps = delta / micros_per_window_;
+ const size_t idx = (current_index_ + num_windows_ - (steps %
num_windows_)) %
+ static_cast<size_t>(num_windows_);
+ return idx;
+}
+
+std::shared_ptr<HistogramImpl>
HistogramWindowingImpl::GetOrCreateForStartLocked(
+ uint64_t aligned_start) {
+ if (num_windows_ == 0) {
+ return nullptr;
+ }
+ size_t idx = IndexForStartLocked(aligned_start);
+ window_start_micros_[idx] = aligned_start;
+ if (!histograms_[idx]) {
+ histograms_[idx] = std::make_shared<HistogramImpl>();
+ }
+ return histograms_[idx];
+}
+
+void HistogramWindowingImpl::Add(double value) {
+ if (num_windows_ == 0) {
+ return;
+ }
+ std::shared_ptr<HistogramImpl> hist;
+ {
+ std::lock_guard<std::mutex> guard(mu_);
+ const uint64_t now = NowMicros();
+ const uint64_t aligned_now = AlignWindowStartMicros(now);
+ // Ignore if clock moves backwards.
+ if (aligned_now < current_window_start_micros_) {
+ return;
+ }
+ AdvanceLocked(aligned_now);
+ if (!histograms_[current_index_]) {
+ histograms_[current_index_] = std::make_shared<HistogramImpl>();
+ window_start_micros_[current_index_] =
current_window_start_micros_;
+ }
+ hist = histograms_[current_index_];
+ ++current_window_num_;
+ }
+ if (hist) {
+ hist->Add(value);
+ }
+}
+
+HistogramStats HistogramWindowingImpl::GetStats() const {
+ if (num_windows_ == 0) {
+ return HistogramStats{};
+ }
+ std::vector<std::pair<uint64_t, std::shared_ptr<HistogramImpl>>> hists;
+ uint64_t aligned_now = 0;
+ {
+ std::lock_guard<std::mutex> guard(mu_);
+ aligned_now = AlignWindowStartMicros(NowMicros());
+ // Note: do NOT force AdvanceLocked here, to keep semantics consistent
with RocksDB's
+ // min_num_per_window behavior (windows advance upon Add() after
enough samples).
+ hists.reserve(histograms_.size());
+ for (size_t i = 0; i < histograms_.size(); ++i) {
+ if (!window_start_micros_[i].has_value() || !histograms_[i]) {
+ continue;
+ }
+ const uint64_t start = window_start_micros_[i].value();
+ hists.emplace_back(start, histograms_[i]);
+ }
+ }
+
+ HistogramImpl aggregated;
+ if (micros_per_window_ == 0 || num_windows_ == 0) {
+ for (const auto& kv : hists) {
+ aggregated.MergeFrom(*kv.second);
+ }
+ return aggregated.GetStats();
+ }
+
+ const uint64_t max_span = micros_per_window_ * num_windows_;
+ for (const auto& kv : hists) {
+ const auto start = kv.first;
+ const auto& h = kv.second;
+ if (aligned_now >= start && (aligned_now - start) < max_span) {
+ aggregated.MergeFrom(*h);
+ }
+ }
+ return aggregated.GetStats();
+}
+
+void HistogramWindowingImpl::Merge(const Histogram& other) {
+ const auto* other_w = dynamic_cast<const HistogramWindowingImpl*>(&other);
+ if (!other_w) {
+ const auto* other_impl = dynamic_cast<const HistogramImpl*>(&other);
+ if (!other_impl) {
+ return;
+ }
+ // Merge a plain histogram into current window.
+ std::shared_ptr<HistogramImpl> target;
+ {
+ std::lock_guard<std::mutex> guard(mu_);
+ const uint64_t aligned_now = AlignWindowStartMicros(NowMicros());
+ if (aligned_now < current_window_start_micros_) {
+ return;
+ }
+ AdvanceLocked(aligned_now);
+ target = GetOrCreateForStartLocked(current_window_start_micros_);
+ }
+ if (target) {
+ target->MergeFrom(*other_impl);
+ }
+ return;
+ }
+
+ // If windowing parameters mismatch, fall back to merging all other
windows into current.
+ if (num_windows_ != other_w->num_windows_ ||
+ micros_per_window_ != other_w->micros_per_window_ ||
+ min_num_per_window_ != other_w->min_num_per_window_) {
+ std::vector<std::shared_ptr<HistogramImpl>> other_only;
+ {
+ std::lock_guard<std::mutex> guard(other_w->mu_);
+ other_only.reserve(other_w->histograms_.size());
+ for (const auto& histogram : other_w->histograms_) {
+ if (histogram) {
+ other_only.push_back(histogram);
+ }
+ }
+ }
+ std::shared_ptr<HistogramImpl> target;
+ {
+ std::lock_guard<std::mutex> guard(mu_);
+ const uint64_t aligned_now = AlignWindowStartMicros(NowMicros());
+ if (aligned_now < current_window_start_micros_) {
+ return;
+ }
+ AdvanceLocked(aligned_now);
+ target = GetOrCreateForStartLocked(current_window_start_micros_);
+ }
+ if (target) {
+ for (const auto& h : other_only) {
+ if (h) {
+ target->MergeFrom(*h);
+ }
+ }
+ }
+ return;
+ }
+
+ // Merge by window start timestamps.
+ std::vector<std::pair<uint64_t, std::shared_ptr<HistogramImpl>>>
other_hists;
+ {
+ std::lock_guard<std::mutex> guard(other_w->mu_);
+ other_hists.reserve(other_w->histograms_.size());
+ for (size_t i = 0; i < other_w->histograms_.size(); ++i) {
+ if (!other_w->window_start_micros_[i].has_value() ||
!other_w->histograms_[i]) {
+ continue;
+ }
+ other_hists.emplace_back(other_w->window_start_micros_[i].value(),
+ other_w->histograms_[i]);
+ }
+ }
+
+ {
+ std::lock_guard<std::mutex> guard(mu_);
+ const uint64_t aligned_now = AlignWindowStartMicros(NowMicros());
+ if (aligned_now < current_window_start_micros_) {
+ return;
+ }
+ AdvanceLocked(aligned_now);
+ const uint64_t max_span = micros_per_window_ * num_windows_;
+ for (const auto& kv : other_hists) {
+ const auto start = kv.first;
+ const auto& hist = kv.second;
+ if (aligned_now >= start && (aligned_now - start) < max_span) {
+ auto target = GetOrCreateForStartLocked(start);
+ if (target && hist) {
+ target->MergeFrom(*hist);
+ }
+ }
+ }
+ }
+}
+
+std::shared_ptr<Histogram> HistogramWindowingImpl::Clone() const {
+ auto cloned = std::make_shared<HistogramWindowingImpl>(num_windows_,
micros_per_window_,
+
min_num_per_window_);
+ std::lock_guard<std::mutex> guard(mu_);
+ std::lock_guard<std::mutex> guard2(cloned->mu_);
+
+ cloned->current_window_start_micros_ = current_window_start_micros_;
+ cloned->current_window_num_ = current_window_num_;
+ cloned->current_index_ = current_index_;
+ cloned->window_start_micros_ = window_start_micros_;
+ cloned->histograms_.assign(num_windows_, nullptr);
+ for (size_t i = 0; i < histograms_.size() && i <
cloned->histograms_.size(); ++i) {
+ const auto& histogram = histograms_[i];
+ if (histogram) {
+ cloned->histograms_[i] =
std::dynamic_pointer_cast<HistogramImpl>(histogram->Clone());
+ }
+ }
+ return cloned;
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/metrics/histogram_windowing.h
b/src/paimon/common/metrics/histogram_windowing.h
new file mode 100644
index 0000000..bec9b60
--- /dev/null
+++ b/src/paimon/common/metrics/histogram_windowing.h
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <optional>
+#include <vector>
+
+#include "paimon/common/metrics/histogram.h"
+
+namespace paimon {
+
+// Window-based histogram implementation.
+//
+// Inspired by RocksDB's monitoring/histogram_windowing.h.
+// Maintains multiple time windows and aggregates stats across the recent ones.
+class HistogramWindowingImpl : public Histogram {
+ public:
+ // Default: 3 windows, 60s per window , 100 per window (paimon-java
default)
+ HistogramWindowingImpl()
+ : num_windows_(3), micros_per_window_(60 * 1000ULL * 1000ULL),
min_num_per_window_(100) {
+ Init();
+ }
+
+ HistogramWindowingImpl(uint64_t num_windows, uint64_t micros_per_window,
+ uint64_t min_num_per_window)
+ : num_windows_(num_windows),
+ micros_per_window_(micros_per_window),
+ min_num_per_window_(min_num_per_window) {
+ Init();
+ }
+
+ void Add(double value) override;
+ HistogramStats GetStats() const override;
+ void Merge(const Histogram& other) override;
+ std::shared_ptr<Histogram> Clone() const override;
+
+ private:
+ static uint64_t NowMicros();
+ uint64_t AlignWindowStartMicros(uint64_t micros) const;
+
+ void Init();
+ void ResetLocked(uint64_t aligned_now);
+ void AdvanceLocked(uint64_t aligned_now);
+ size_t IndexForStartLocked(uint64_t aligned_start) const;
+ std::shared_ptr<HistogramImpl> GetOrCreateForStartLocked(uint64_t
aligned_start);
+
+ private:
+ const uint64_t num_windows_;
+ const uint64_t micros_per_window_;
+ const uint64_t min_num_per_window_;
+
+ mutable std::mutex mu_;
+
+ // State protected by mu_.
+ uint64_t current_window_start_micros_ = 0;
+ uint64_t current_window_num_ = 0;
+ size_t current_index_ = 0;
+
+ // Per slot: start timestamp and histogram.
+ std::vector<std::optional<uint64_t>> window_start_micros_;
+ std::vector<std::shared_ptr<HistogramImpl>> histograms_;
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/metrics/histogram_windowing_test.cpp
b/src/paimon/common/metrics/histogram_windowing_test.cpp
new file mode 100644
index 0000000..ea5083c
--- /dev/null
+++ b/src/paimon/common/metrics/histogram_windowing_test.cpp
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/metrics/histogram_windowing.h"
+
+#include <chrono>
+#include <thread>
+
+#include "gtest/gtest.h"
+
+namespace paimon::test {
+
+TEST(HistogramWindowingImplTest, TestAdvanceAndAggregateAcrossWindows) {
+ // Use a relatively large span to avoid flakiness around boundary
(aligned_now - start == span).
+ HistogramWindowingImpl h(/*num_windows=*/4, /*micros_per_window=*/25000,
+ /*min_num_per_window=*/1);
+ h.Add(1);
+ std::this_thread::sleep_for(std::chrono::milliseconds(26));
+ h.Add(2);
+
+ HistogramStats s = h.GetStats();
+ EXPECT_EQ(s.count, 2);
+ EXPECT_DOUBLE_EQ(s.min, 1);
+ EXPECT_DOUBLE_EQ(s.max, 2);
+ EXPECT_NEAR(s.average, 1.5, 1e-12);
+}
+
+TEST(HistogramWindowingImplTest, TestResetWhenBeyondMaxSpan) {
+ HistogramWindowingImpl h(/*num_windows=*/2, /*micros_per_window=*/1000,
+ /*min_num_per_window=*/1);
+ h.Add(1);
+ std::this_thread::sleep_for(std::chrono::milliseconds(2));
+ h.Add(2);
+
+ // Sleep long enough so that aligned_now - current_window_start >=
+ // micros_per_window*num_windows.
+ std::this_thread::sleep_for(std::chrono::milliseconds(3));
+ h.Add(3);
+
+ HistogramStats s = h.GetStats();
+ EXPECT_EQ(s.count, 1);
+ EXPECT_DOUBLE_EQ(s.min, 3);
+ EXPECT_DOUBLE_EQ(s.max, 3);
+ EXPECT_NEAR(s.average, 3.0, 1e-12);
+}
+
+TEST(HistogramWindowingImplTest, TestMergeSameParams) {
+ // Use a large window to avoid relying on wall clock for this test.
+ HistogramWindowingImpl h1(/*num_windows=*/4, /*micros_per_window=*/1000ULL
* 1000ULL,
+ /*min_num_per_window=*/1);
+ HistogramWindowingImpl h2(/*num_windows=*/4, /*micros_per_window=*/1000ULL
* 1000ULL,
+ /*min_num_per_window=*/1);
+ h1.Add(1);
+ h1.Add(2);
+ h2.Add(3);
+
+ h1.Merge(h2);
+ HistogramStats s = h1.GetStats();
+ EXPECT_EQ(s.count, 3);
+ EXPECT_DOUBLE_EQ(s.sum, 6);
+ EXPECT_DOUBLE_EQ(s.min, 1);
+ EXPECT_DOUBLE_EQ(s.max, 3);
+}
+
+TEST(HistogramWindowingImplTest, TestMergeDifferentParamsFallback) {
+ HistogramWindowingImpl h1(/*num_windows=*/4, /*micros_per_window=*/1000ULL
* 1000ULL,
+ /*min_num_per_window=*/1);
+ HistogramWindowingImpl h2(/*num_windows=*/8, /*micros_per_window=*/1000ULL
* 1000ULL,
+ /*min_num_per_window=*/1);
+ h2.Add(10);
+ h2.Add(20);
+
+ h1.Merge(h2);
+ HistogramStats s = h1.GetStats();
+ EXPECT_EQ(s.count, 2);
+ EXPECT_DOUBLE_EQ(s.min, 10);
+ EXPECT_DOUBLE_EQ(s.max, 20);
+}
+
+TEST(HistogramWindowingImplTest, TestMinNumPerWindow100Case) {
+ // Validate min_num_per_window behavior:
+ // - window advancement is gated by sample count
+ // - if the histogram doesn't advance in time, it may get reset once
beyond max span
+ // Use a larger window to reduce wall-clock sensitivity and avoid
flakiness.
+ HistogramWindowingImpl h(/*num_windows=*/3, /*micros_per_window=*/50000,
+ /*min_num_per_window=*/100);
+
+ // Fill current window but keep it below min_num.
+ for (int32_t i = 0; i < 99; ++i) {
+ h.Add(1);
+ }
+
+ // Cross at least one window.
+ std::this_thread::sleep_for(std::chrono::milliseconds(75));
+ h.Add(2); // 100th sample; advancement check happens before this add.
+
+ HistogramStats s1 = h.GetStats();
+ EXPECT_EQ(s1.count, 100);
+ EXPECT_DOUBLE_EQ(s1.min, 1);
+ EXPECT_DOUBLE_EQ(s1.max, 2);
+
+ // Cross enough time so that aligned_now - current_window_start >= max_span
+ // (max_span = num_windows * micros_per_window = 150ms here).
+ std::this_thread::sleep_for(std::chrono::milliseconds(175));
+ h.Add(1000);
+
+ HistogramStats s2 = h.GetStats();
+ EXPECT_EQ(s2.count, 1);
+ EXPECT_DOUBLE_EQ(s2.min, 1000);
+ EXPECT_DOUBLE_EQ(s2.max, 1000);
+}
+
+TEST(HistogramWindowingImplTest, TestLargeDatasetInSingleWindow) {
+ // Use a large window to avoid relying on wall clock.
+ HistogramWindowingImpl h(/*num_windows=*/4, /*micros_per_window=*/60ULL *
1000ULL * 1000ULL,
+ /*min_num_per_window=*/1);
+
+ constexpr uint64_t n = 10000;
+ for (uint64_t i = 1; i <= n; ++i) {
+ h.Add(static_cast<double>(i));
+ }
+
+ HistogramStats s = h.GetStats();
+ EXPECT_EQ(s.count, n);
+ EXPECT_DOUBLE_EQ(s.min, 1);
+ EXPECT_DOUBLE_EQ(s.max, static_cast<double>(n));
+ EXPECT_DOUBLE_EQ(s.sum, static_cast<double>(n) * static_cast<double>(n +
1) / 2.0);
+ EXPECT_NEAR(s.average, (static_cast<double>(n) + 1.0) / 2.0, 1e-12);
+}
+
+TEST(HistogramWindowingImplTest, TestCrossMaxSpanAfterAdvance) {
+ // Build multiple windows, then cross max_span so that all previous
windows are dropped.
+ HistogramWindowingImpl h(/*num_windows=*/2, /*micros_per_window=*/2000,
+ /*min_num_per_window=*/1);
+
+ h.Add(1);
+ std::this_thread::sleep_for(std::chrono::milliseconds(3));
+ h.Add(2); // should advance to next window.
+
+ // Cross max_span (4ms) relative to the current window start.
+ // max_span = 4ms here.
+ std::this_thread::sleep_for(std::chrono::milliseconds(7));
+ h.Add(3); // should reset.
+
+ HistogramStats s = h.GetStats();
+ EXPECT_EQ(s.count, 1);
+ EXPECT_DOUBLE_EQ(s.min, 3);
+ EXPECT_DOUBLE_EQ(s.max, 3);
+ EXPECT_DOUBLE_EQ(s.sum, 3);
+}
+
+TEST(HistogramWindowingImplTest, TestCloneConsistencyAndIndependence) {
+ // Use a large window to avoid relying on wall clock and window
advancement.
+ HistogramWindowingImpl h(/*num_windows=*/4, /*micros_per_window=*/60ULL *
1000ULL * 1000ULL,
+ /*min_num_per_window=*/1);
+ h.Add(1);
+ h.Add(2);
+ h.Add(3);
+ h.Add(4);
+
+ const HistogramStats before = h.GetStats();
+
+ auto cloned_base = h.Clone();
+ auto cloned =
std::dynamic_pointer_cast<HistogramWindowingImpl>(cloned_base);
+ ASSERT_TRUE(cloned != nullptr);
+
+ const HistogramStats cloned_before = cloned->GetStats();
+ EXPECT_EQ(cloned_before.count, before.count);
+ EXPECT_DOUBLE_EQ(cloned_before.sum, before.sum);
+ EXPECT_DOUBLE_EQ(cloned_before.min, before.min);
+ EXPECT_DOUBLE_EQ(cloned_before.max, before.max);
+ EXPECT_DOUBLE_EQ(cloned_before.average, before.average);
+ EXPECT_DOUBLE_EQ(cloned_before.stddev, before.stddev);
+ EXPECT_DOUBLE_EQ(cloned_before.p50, before.p50);
+ EXPECT_DOUBLE_EQ(cloned_before.p90, before.p90);
+ EXPECT_DOUBLE_EQ(cloned_before.p95, before.p95);
+ EXPECT_DOUBLE_EQ(cloned_before.p99, before.p99);
+ EXPECT_DOUBLE_EQ(cloned_before.p999, before.p999);
+
+ // Mutating original should not affect cloned.
+ h.Add(100);
+ const HistogramStats after_original = h.GetStats();
+ const HistogramStats after_clone = cloned->GetStats();
+ EXPECT_EQ(after_clone.count, cloned_before.count);
+ EXPECT_EQ(after_original.count, cloned_before.count + 1);
+
+ // Mutating cloned should not affect original.
+ cloned->Add(200);
+ const HistogramStats after_clone2 = cloned->GetStats();
+ const HistogramStats after_original2 = h.GetStats();
+ EXPECT_EQ(after_original2.count, after_original.count);
+ EXPECT_EQ(after_clone2.count, cloned_before.count + 1);
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/metrics/metrics_impl.cpp
b/src/paimon/common/metrics/metrics_impl.cpp
new file mode 100644
index 0000000..87d8768
--- /dev/null
+++ b/src/paimon/common/metrics/metrics_impl.cpp
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/metrics/metrics_impl.h"
+
+#include <utility>
+#include <vector>
+
+#include "fmt/format.h"
+#include "paimon/common/metrics/histogram_windowing.h"
+#include "paimon/result.h"
+#include "rapidjson/document.h"
+#include "rapidjson/encodings.h"
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace paimon {
+void MetricsImpl::SetCounter(const std::string& metric_name, uint64_t
metric_value) {
+ std::lock_guard<std::mutex> guard(counter_lock_);
+ counters_[metric_name] = metric_value;
+}
+
+Result<uint64_t> MetricsImpl::GetCounter(const std::string& metric_name) const
{
+ std::lock_guard<std::mutex> guard(counter_lock_);
+ auto iter = counters_.find(metric_name);
+ if (iter != counters_.end()) {
+ return iter->second;
+ }
+ return Status::KeyError(fmt::format("metric '{}' not found", metric_name));
+}
+
+std::map<std::string, uint64_t> MetricsImpl::GetAllCounters() const {
+ std::lock_guard<std::mutex> guard(counter_lock_);
+ return counters_;
+}
+
+void MetricsImpl::ObserveHistogram(const std::string& metric_name, double
value) {
+ std::shared_ptr<Histogram> histogram;
+ {
+ std::lock_guard<std::mutex> guard(histogram_lock_);
+ auto iter = histograms_.find(metric_name);
+ if (iter == histograms_.end()) {
+ histogram = std::make_shared<HistogramWindowingImpl>();
+ histograms_.emplace(metric_name, histogram);
+ } else {
+ histogram = iter->second;
+ }
+ }
+ histogram->Add(value);
+}
+
+Result<HistogramStats> MetricsImpl::GetHistogramStats(const std::string&
metric_name) const {
+ std::shared_ptr<Histogram> histogram;
+ {
+ std::lock_guard<std::mutex> guard(histogram_lock_);
+ auto iter = histograms_.find(metric_name);
+ if (iter == histograms_.end()) {
+ return Status::KeyError(fmt::format("histogram '{}' not found",
metric_name));
+ }
+ histogram = iter->second;
+ }
+ return histogram->GetStats();
+}
+
+std::map<std::string, HistogramStats> MetricsImpl::GetAllHistogramStats()
const {
+ std::vector<std::pair<std::string, std::shared_ptr<Histogram>>> histograms;
+ {
+ std::lock_guard<std::mutex> guard(histogram_lock_);
+ histograms.reserve(histograms_.size());
+ for (const auto& kv : histograms_) {
+ histograms.push_back(kv);
+ }
+ }
+ std::map<std::string, HistogramStats> res;
+ for (const auto& [name, histogram] : histograms) {
+ res.emplace(name, histogram->GetStats());
+ }
+ return res;
+}
+
+void MetricsImpl::SetGauge(const std::string& metric_name, double value) {
+ std::lock_guard<std::mutex> lock(gauge_lock_);
+ gauges_[metric_name] = value;
+}
+
+Result<double> MetricsImpl::GetGauge(const std::string& metric_name) const {
+ std::lock_guard<std::mutex> lock(gauge_lock_);
+ auto it = gauges_.find(metric_name);
+ if (it != gauges_.end()) {
+ return it->second;
+ }
+ return Status::KeyError(fmt::format("metric '{}' not found", metric_name));
+}
+
+std::map<std::string, double> MetricsImpl::GetAllGauges() const {
+ std::lock_guard<std::mutex> lock(gauge_lock_);
+ return gauges_;
+}
+
+void MetricsImpl::Merge(const std::shared_ptr<Metrics>& other) {
+ if (other && this != other.get()) {
+ std::map<std::string, uint64_t> other_counters =
other->GetAllCounters();
+ {
+ std::lock_guard<std::mutex> guard(counter_lock_);
+ for (const auto& kv : other_counters) {
+ auto iter = counters_.find(kv.first);
+ if (iter == counters_.end()) {
+ counters_[kv.first] = kv.second;
+ } else {
+ counters_[kv.first] += kv.second;
+ }
+ }
+ }
+ std::map<std::string, double> other_gauges = other->GetAllGauges();
+ {
+ std::lock_guard<std::mutex> guard(gauge_lock_);
+ for (const auto& kv : other_gauges) {
+ auto iter = gauges_.find(kv.first);
+ if (iter == gauges_.end()) {
+ gauges_[kv.first] = kv.second;
+ } else {
+ gauges_[kv.first] += kv.second;
+ }
+ }
+ }
+ auto other_impl = std::dynamic_pointer_cast<MetricsImpl>(other);
+ if (other_impl) {
+ std::vector<std::pair<std::string, std::shared_ptr<Histogram>>>
other_histograms;
+ {
+ std::lock_guard<std::mutex> guard(other_impl->histogram_lock_);
+ other_histograms.reserve(other_impl->histograms_.size());
+ for (const auto& kv : other_impl->histograms_) {
+ other_histograms.push_back(kv);
+ }
+ }
+
+ for (const auto& [name, other_hist] : other_histograms) {
+ std::shared_ptr<Histogram> this_hist;
+ {
+ std::lock_guard<std::mutex> guard(histogram_lock_);
+ auto iter = histograms_.find(name);
+ if (iter == histograms_.end()) {
+ this_hist = std::make_shared<HistogramWindowingImpl>();
+ histograms_.emplace(name, this_hist);
+ } else {
+ this_hist = iter->second;
+ }
+ }
+ this_hist->Merge(*other_hist);
+ }
+ }
+ }
+}
+
+void MetricsImpl::Overwrite(const std::shared_ptr<Metrics>& other) {
+ if (other && this != other.get()) {
+ std::map<std::string, uint64_t> other_counters =
other->GetAllCounters();
+ {
+ std::lock_guard<std::mutex> guard(counter_lock_);
+ counters_.swap(other_counters);
+ }
+ std::map<std::string, double> other_gauges = other->GetAllGauges();
+ {
+ std::lock_guard<std::mutex> guard(gauge_lock_);
+ gauges_.swap(other_gauges);
+ }
+
+ auto other_impl = std::dynamic_pointer_cast<MetricsImpl>(other);
+ std::map<std::string, std::shared_ptr<Histogram>> new_histograms;
+ if (other_impl) {
+ std::vector<std::pair<std::string, std::shared_ptr<Histogram>>>
other_histograms;
+ {
+ std::lock_guard<std::mutex> lock(other_impl->histogram_lock_);
+ other_histograms.reserve(other_impl->histograms_.size());
+ for (const auto& kv : other_impl->histograms_) {
+ other_histograms.push_back(kv);
+ }
+ }
+ for (const auto& [name, histogram] : other_histograms) {
+ new_histograms.emplace(name, histogram->Clone());
+ }
+ }
+ {
+ std::lock_guard<std::mutex> lock(histogram_lock_);
+ histograms_.swap(new_histograms);
+ }
+ }
+}
+
+std::string MetricsImpl::ToString() const {
+ using RapidWriter =
+ rapidjson::Writer<rapidjson::StringBuffer, rapidjson::UTF8<>,
rapidjson::ASCII<>>;
+ rapidjson::Document doc;
+ doc.SetObject();
+ rapidjson::Document::AllocatorType& allocator = doc.GetAllocator();
+
+ std::map<std::string, uint64_t> counters = GetAllCounters();
+ for (const auto& kv : counters) {
+ doc.AddMember(rapidjson::Value(kv.first, allocator),
rapidjson::Value(kv.second),
+ allocator);
+ }
+
+ // Flatten histogram stats into numeric key/value pairs to keep JSON
schema simple.
+ // Example keys: "read.latency.p99", "read.latency.avg",
"read.latency.count".
+ std::map<std::string, HistogramStats> hist_stats = GetAllHistogramStats();
+ for (const auto& [name, s] : hist_stats) {
+ doc.AddMember(rapidjson::Value(name + ".count", allocator),
rapidjson::Value(s.count),
+ allocator);
+ rapidjson::Value sum_val;
+ sum_val.SetDouble(s.sum);
+ doc.AddMember(rapidjson::Value(name + ".sum", allocator), sum_val,
allocator);
+
+ rapidjson::Value min_val;
+ min_val.SetDouble(s.min);
+ doc.AddMember(rapidjson::Value(name + ".min", allocator), min_val,
allocator);
+
+ rapidjson::Value max_val;
+ max_val.SetDouble(s.max);
+ doc.AddMember(rapidjson::Value(name + ".max", allocator), max_val,
allocator);
+
+ rapidjson::Value avg_val;
+ avg_val.SetDouble(s.average);
+ doc.AddMember(rapidjson::Value(name + ".avg", allocator), avg_val,
allocator);
+
+ rapidjson::Value p50_val;
+ p50_val.SetDouble(s.p50);
+ doc.AddMember(rapidjson::Value(name + ".p50", allocator), p50_val,
allocator);
+
+ rapidjson::Value p90_val;
+ p90_val.SetDouble(s.p90);
+ doc.AddMember(rapidjson::Value(name + ".p90", allocator), p90_val,
allocator);
+
+ rapidjson::Value p95_val;
+ p95_val.SetDouble(s.p95);
+ doc.AddMember(rapidjson::Value(name + ".p95", allocator), p95_val,
allocator);
+
+ rapidjson::Value p99_val;
+ p99_val.SetDouble(s.p99);
+ doc.AddMember(rapidjson::Value(name + ".p99", allocator), p99_val,
allocator);
+
+ rapidjson::Value p999_val;
+ p999_val.SetDouble(s.p999);
+ doc.AddMember(rapidjson::Value(name + ".p99.9", allocator), p999_val,
allocator);
+
+ rapidjson::Value stddev_val;
+ stddev_val.SetDouble(s.stddev);
+ doc.AddMember(rapidjson::Value(name + ".stddev", allocator),
stddev_val, allocator);
+ }
+
+ std::map<std::string, double> gauges = GetAllGauges();
+ for (const auto& kv : gauges) {
+ doc.AddMember(rapidjson::Value(kv.first, allocator),
rapidjson::Value(kv.second),
+ allocator);
+ }
+
+ rapidjson::StringBuffer s;
+ RapidWriter writer(s);
+ doc.Accept(writer);
+ return s.GetString();
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/metrics/metrics_impl.h
b/src/paimon/common/metrics/metrics_impl.h
new file mode 100644
index 0000000..a36a59c
--- /dev/null
+++ b/src/paimon/common/metrics/metrics_impl.h
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <string>
+
+#include "paimon/metrics.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+class Histogram;
+
+class PAIMON_EXPORT MetricsImpl : public Metrics {
+ public:
+ ~MetricsImpl() override = default;
+
+ void SetCounter(const std::string& metric_name, uint64_t metric_value)
override;
+ Result<uint64_t> GetCounter(const std::string& metric_name) const override;
+ std::map<std::string, uint64_t> GetAllCounters() const override;
+
+ void ObserveHistogram(const std::string& metric_name, double value)
override;
+ Result<HistogramStats> GetHistogramStats(const std::string& metric_name)
const override;
+ std::map<std::string, HistogramStats> GetAllHistogramStats() const
override;
+
+ void SetGauge(const std::string& metric_name, double metric_value)
override;
+ Result<double> GetGauge(const std::string& metric_name) const override;
+ std::map<std::string, double> GetAllGauges() const override;
+
+ void Merge(const std::shared_ptr<Metrics>& other) override;
+ std::string ToString() const override;
+ void Overwrite(const std::shared_ptr<Metrics>& metrics);
+
+ template <typename T>
+ static std::shared_ptr<Metrics> CollectReadMetrics(const T& readers) {
+ auto res_metrics = std::make_shared<MetricsImpl>();
+ for (const auto& reader : readers) {
+ if (!reader) {
+ continue;
+ }
+ auto metrics = reader->GetReaderMetrics();
+ if (metrics) {
+ res_metrics->Merge(metrics);
+ }
+ }
+ return res_metrics;
+ }
+
+ private:
+ mutable std::mutex counter_lock_;
+ std::map<std::string, uint64_t> counters_;
+
+ mutable std::mutex histogram_lock_;
+ std::map<std::string, std::shared_ptr<Histogram>> histograms_;
+
+ mutable std::mutex gauge_lock_;
+ std::map<std::string, double> gauges_;
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/metrics/metrics_impl_test.cpp
b/src/paimon/common/metrics/metrics_impl_test.cpp
new file mode 100644
index 0000000..b481988
--- /dev/null
+++ b/src/paimon/common/metrics/metrics_impl_test.cpp
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/metrics/metrics_impl.h"
+
+#include "gtest/gtest.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(MetricsImplTest, TestSimple) {
+ auto metrics = std::make_shared<MetricsImpl>();
+ metrics->SetCounter("some_metric", 100);
+ metrics->SetCounter("some_metric_2", 150);
+ ASSERT_OK_AND_ASSIGN(uint64_t counter, metrics->GetCounter("some_metric"));
+ ASSERT_EQ(100, counter);
+ ASSERT_OK_AND_ASSIGN(counter, metrics->GetCounter("some_metric_2"));
+ ASSERT_EQ(150, counter);
+ auto other = std::make_shared<MetricsImpl>();
+ other->SetCounter("some_metric_2", 200);
+ other->SetCounter("some_metric_3", 300);
+ metrics->Merge(other);
+ ASSERT_OK_AND_ASSIGN(counter, metrics->GetCounter("some_metric"));
+ ASSERT_EQ(100, counter);
+ ASSERT_OK_AND_ASSIGN(counter, metrics->GetCounter("some_metric_2"));
+ ASSERT_EQ(350, counter);
+ ASSERT_OK_AND_ASSIGN(counter, metrics->GetCounter("some_metric_3"));
+ ASSERT_EQ(300, counter);
+ metrics->Overwrite(other);
+ ASSERT_OK_AND_ASSIGN(counter, metrics->GetCounter("some_metric_2"));
+ ASSERT_EQ(200, counter);
+ ASSERT_OK_AND_ASSIGN(counter, metrics->GetCounter("some_metric_3"));
+ ASSERT_EQ(300, counter);
+ // check some_metric is not exist after overwrite
+ ASSERT_NOK_WITH_MSG(metrics->GetCounter("some_metric"),
+ "Key error: metric 'some_metric' not found");
+}
+
+TEST(MetricsImplTest, TestGaugeMergeAndOverwrite) {
+ auto metrics = std::make_shared<MetricsImpl>();
+ metrics->SetGauge("g1", 1.5);
+ metrics->SetGauge("g2", 2.0);
+
+ auto other = std::make_shared<MetricsImpl>();
+ other->SetGauge("g2", 3.25);
+ other->SetGauge("g3", 4.75);
+
+ metrics->Merge(other);
+
+ ASSERT_OK_AND_ASSIGN(double gauge, metrics->GetGauge("g1"));
+ EXPECT_DOUBLE_EQ(1.5, gauge);
+ ASSERT_OK_AND_ASSIGN(gauge, metrics->GetGauge("g2"));
+ EXPECT_DOUBLE_EQ(5.25, gauge);
+ ASSERT_OK_AND_ASSIGN(gauge, metrics->GetGauge("g3"));
+ EXPECT_DOUBLE_EQ(4.75, gauge);
+
+ metrics->Overwrite(other);
+
+ ASSERT_OK_AND_ASSIGN(gauge, metrics->GetGauge("g2"));
+ EXPECT_DOUBLE_EQ(3.25, gauge);
+ ASSERT_OK_AND_ASSIGN(gauge, metrics->GetGauge("g3"));
+ EXPECT_DOUBLE_EQ(4.75, gauge);
+ ASSERT_NOK_WITH_MSG(metrics->GetGauge("g1"), "Key error: metric 'g1' not
found");
+}
+
+TEST(MetricsImplTest, TestToString) {
+ std::shared_ptr<MetricsImpl> metrics1 = std::make_shared<MetricsImpl>();
+ metrics1->SetCounter("k1", 1);
+ metrics1->SetCounter("k2", 2);
+ metrics1->SetGauge("g1", 1.25);
+ std::shared_ptr<MetricsImpl> metrics2 = std::make_shared<MetricsImpl>();
+ metrics2->SetCounter("m1", 3);
+ metrics2->SetCounter("m2", 4);
+ metrics2->SetCounter("k2", 5);
+ metrics2->SetGauge("g2", 2.5);
+ metrics1->Merge(metrics2);
+ EXPECT_EQ(metrics1->ToString(),
"{\"k1\":1,\"k2\":7,\"m1\":3,\"m2\":4,\"g1\":1.25,\"g2\":2.5}");
+}
+
+} // namespace paimon::test