This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 2c6dbe1  feat: add executor future helpers and metrics utilities (#47)
2c6dbe1 is described below

commit 2c6dbe12d6f0a07a25d1ebd50ab512f25496dc91
Author: Yonghao Fang <[email protected]>
AuthorDate: Thu Jun 4 14:31:18 2026 +0800

    feat: add executor future helpers and metrics utilities (#47)
---
 include/paimon/metrics.h                           |  87 ++++++
 .../common/executor/default_executor_test.cpp      | 127 +++++++++
 src/paimon/common/executor/future.h                |  92 +++++++
 .../common/factories/factory_creator_test.cpp      |  95 +++++++
 src/paimon/common/factories/io_hook_test.cpp       |  67 +++++
 src/paimon/common/metrics/histogram.cpp            | 222 +++++++++++++++
 src/paimon/common/metrics/histogram.h              |  92 +++++++
 src/paimon/common/metrics/histogram_test.cpp       | 244 ++++++++++++++++
 src/paimon/common/metrics/histogram_windowing.cpp  | 306 +++++++++++++++++++++
 src/paimon/common/metrics/histogram_windowing.h    |  84 ++++++
 .../common/metrics/histogram_windowing_test.cpp    | 211 ++++++++++++++
 src/paimon/common/metrics/metrics_impl.cpp         | 279 +++++++++++++++++++
 src/paimon/common/metrics/metrics_impl.h           |  81 ++++++
 src/paimon/common/metrics/metrics_impl_test.cpp    |  96 +++++++
 14 files changed, 2083 insertions(+)

diff --git a/include/paimon/metrics.h b/include/paimon/metrics.h
new file mode 100644
index 0000000..1db5f5f
--- /dev/null
+++ b/include/paimon/metrics.h
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <string>
+
+#include "paimon/type_fwd.h"
+
+namespace paimon {
+
+/// Statistics snapshot for a histogram metric.
+///
+/// Note: percentile values are estimated from internal buckets.
+struct PAIMON_EXPORT HistogramStats {
+    uint64_t count = 0;
+    double sum = 0;
+    double min = 0;
+    double max = 0;
+    double average = 0;
+    double p50 = 0;
+    double p90 = 0;
+    double p95 = 0;
+    double p99 = 0;
+    double p999 = 0;
+    double stddev = 0;
+};
+
+/// Abstract interface for collecting and managing performance metrics in 
Paimon operations.
+class PAIMON_EXPORT Metrics {
+ public:
+    virtual ~Metrics() = default;
+
+    /// Set the value of a specific counter metric.
+    virtual void SetCounter(const std::string& metric_name, uint64_t 
metric_value) = 0;
+
+    /// Get the current value of a specific counter metric.
+    virtual Result<uint64_t> GetCounter(const std::string& metric_name) const 
= 0;
+
+    /// Get all counter metrics as a map.
+    virtual std::map<std::string, uint64_t> GetAllCounters() const = 0;
+
+    /// Add a sample to a histogram metric.
+    virtual void ObserveHistogram(const std::string& metric_name, double 
value) = 0;
+
+    /// Get histogram statistics snapshot.
+    virtual Result<HistogramStats> GetHistogramStats(const std::string& 
metric_name) const = 0;
+
+    /// Get all histogram statistics snapshots.
+    virtual std::map<std::string, HistogramStats> GetAllHistogramStats() const 
= 0;
+
+    /// Set the value of a specific gauge metric.
+    virtual void SetGauge(const std::string& metric_name, double metric_value) 
= 0;
+
+    /// Get the current value of a specific gauge metric.
+    virtual Result<double> GetGauge(const std::string& metric_name) const = 0;
+
+    /// Get all gauge metrics as a map.
+    virtual std::map<std::string, double> GetAllGauges() const = 0;
+
+    /// Merge metrics from another Metrics instance into this one.
+    virtual void Merge(const std::shared_ptr<Metrics>& other) = 0;
+
+    /// Convert all metrics to a JSON string representation.
+    virtual std::string ToString() const = 0;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/executor/default_executor_test.cpp 
b/src/paimon/common/executor/default_executor_test.cpp
new file mode 100644
index 0000000..26d84f8
--- /dev/null
+++ b/src/paimon/common/executor/default_executor_test.cpp
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <atomic>
+#include <chrono>
+#include <cstdint>
+#include <future>
+#include <memory>
+#include <stdexcept>
+#include <thread>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/common/executor/future.h"
+#include "paimon/executor.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon::test {
+
+TEST(DefaultExecutorTest, TestViaVoidFunc) {
+    auto executor = GetGlobalDefaultExecutor();
+    std::atomic<int64_t> sum = {0};
+    std::vector<std::future<void>> futures;
+    for (int32_t index = 0; index < 10; ++index) {
+        futures.push_back(Via(executor.get(), [&sum]() { sum++; }));
+    }
+    Wait(futures);
+    ASSERT_EQ(10, sum.load());
+}
+
+TEST(DefaultExecutorTest, TestVia) {
+    auto executor = GetGlobalDefaultExecutor();
+    std::atomic<int64_t> sum = {0};
+    std::vector<std::future<int>> futures;
+    for (int32_t index = 0; index < 10; ++index) {
+        futures.push_back(Via(executor.get(), [index, &sum]() -> int32_t {
+            sum++;
+            return index * 2;
+        }));
+    }
+    auto results = CollectAll(futures);
+    ASSERT_EQ(10, results.size());
+    std::vector<int> expected = {0, 2, 4, 6, 8, 10, 12, 14, 16, 18};
+    ASSERT_EQ(expected, results);
+    ASSERT_EQ(10, sum.load());
+}
+
+TEST(DefaultExecutorTest, TestViaWithResult) {
+    auto executor = GetGlobalDefaultExecutor();
+    std::vector<std::future<Result<std::vector<int32_t>>>> futures;
+    std::vector<int32_t> inputs = {-2, -1, 1, 2};
+    for (const auto& input : inputs) {
+        futures.push_back(Via(executor.get(), [input]() -> 
Result<std::vector<int32_t>> {
+            if (input > 0) {
+                std::vector<int32_t> output = {-2, -1, 1, 2};
+                return output;
+            }
+            return Status::Invalid("negative");
+        }));
+    }
+    auto results = CollectAll(futures);
+    ASSERT_EQ(4, results.size());
+}
+
+TEST(DefaultExecutorTest, TestViaWithException) {
+    auto executor = GetGlobalDefaultExecutor();
+    auto future = Via(executor.get(), []() { throw std::runtime_error("test"); 
});
+    ASSERT_THROW(future.get(), std::runtime_error);
+}
+
+TEST(DefaultExecutorTest, TestShutdownNowDropsPendingTasks) {
+    auto executor = CreateDefaultExecutor(/*thread_count=*/1);
+    std::atomic<bool> first_started = false;
+    std::atomic<int32_t> executed_count = 0;
+    std::promise<void> release_first_task;
+    auto release_future = release_first_task.get_future();
+    executor->Add([&]() {
+        first_started.store(true);
+        release_future.wait();
+        ++executed_count;
+    });
+
+    for (int32_t index = 0; index < 20; ++index) {
+        executor->Add([&]() { ++executed_count; });
+    }
+
+    for (int32_t retry = 0; retry < 100 && !first_started.load(); ++retry) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+    }
+    ASSERT_TRUE(first_started.load());
+    std::thread shutdown_thread([&]() { executor->ShutdownNow(); });
+    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    release_first_task.set_value();
+    shutdown_thread.join();
+
+    ASSERT_EQ(executed_count.load(), 1);
+}
+
+TEST(DefaultExecutorTest, TestAddTaskAfterShutdownNowIgnored) {
+    auto executor = CreateDefaultExecutor(/*thread_count=*/1);
+    std::atomic<int32_t> executed_count = 0;
+
+    executor->ShutdownNow();
+    executor->Add([&]() { ++executed_count; });
+
+    std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    ASSERT_EQ(executed_count.load(), 0);
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/executor/future.h 
b/src/paimon/common/executor/future.h
new file mode 100644
index 0000000..91f8d70
--- /dev/null
+++ b/src/paimon/common/executor/future.h
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <functional>
+#include <future>
+#include <memory>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include "paimon/executor.h"
+
+namespace paimon {
+
+/// Submits a function to be executed asynchronously on a given executor and 
returns a future.
+///
+/// This function wraps the provided callable and submits it to the provided 
executor. The function
+/// captures the result using a std::promise, which is used to fulfill the 
returned std::future. If
+/// the callable throws an exception, the exception is captured and set in the 
promise.
+template <typename Func>
+auto Via(Executor* executor, Func&& func) -> 
std::future<std::invoke_result_t<Func>> {
+    using ResultType = std::invoke_result_t<Func>;
+
+    static_assert(std::is_invocable_v<Func>, "func must be callable");
+
+    if constexpr (std::is_constructible_v<std::function<void()>, Func>) {
+        std::function<void()> test_func = func;
+        if (!test_func) {
+            assert(false && "func cannot be an empty std::function");
+        }
+    }
+
+    auto promise = std::make_shared<std::promise<ResultType>>();
+    auto future = promise->get_future();
+
+    executor->Add([promise, func = std::forward<Func>(func)]() mutable {
+        try {
+            if constexpr (std::is_void_v<ResultType>) {
+                func();
+                promise->set_value();
+            } else {
+                promise->set_value(func());
+            }
+        } catch (...) {
+            promise->set_exception(std::current_exception());
+        }
+    });
+
+    return future;
+}
+
+/// Collects the results of multiple futures.
+template <typename T>
+std::vector<T> CollectAll(std::vector<std::future<T>>& futures) {
+    std::vector<T> results;
+    results.reserve(futures.size());
+    for (auto& future : futures) {
+        results.push_back(future.get());
+    }
+
+    return results;
+}
+
+/// Waits for all futures with void return type to complete.
+inline void Wait(std::vector<std::future<void>>& futures) {
+    for (auto& future : futures) {
+        if (future.valid()) {
+            future.get();
+        }
+    }
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/factories/factory_creator_test.cpp 
b/src/paimon/common/factories/factory_creator_test.cpp
new file mode 100644
index 0000000..96ed1dd
--- /dev/null
+++ b/src/paimon/common/factories/factory_creator_test.cpp
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/factories/factory_creator.h"
+
+#include <algorithm>
+#include <new>
+
+#include "gtest/gtest.h"
+#include "paimon/factories/factory.h"
+
+namespace paimon::test {
+
+class MockFactory : public Factory {
+ public:
+    explicit MockFactory(const std::string& name) : name_(name) {}
+    const char* Identifier() const override {
+        return "mock";
+    }
+    std::string GetName() const {
+        return name_;
+    }
+
+ private:
+    std::string name_;
+};
+
+class FactoryCreatorTest : public ::testing::Test {
+ protected:
+    void SetUp() override {
+        factory_creator_ = FactoryCreator::GetInstance();
+    }
+
+    void TearDown() override {
+        factory_creator_->~FactoryCreator();
+        new (factory_creator_) FactoryCreator();
+    }
+
+    FactoryCreator* factory_creator_;
+};
+
+TEST_F(FactoryCreatorTest, RegisterAndCreateFactory) {
+    auto factory1 = new MockFactory("Factory1");
+    auto factory2 = new MockFactory("Factory2");
+
+    factory_creator_->Register("type1", factory1);
+    factory_creator_->Register("type2", factory2);
+
+    Factory* created_factory1 = factory_creator_->Create("type1");
+    Factory* created_factory2 = factory_creator_->Create("type2");
+
+    ASSERT_NE(created_factory1, nullptr);
+    ASSERT_NE(created_factory2, nullptr);
+
+    EXPECT_EQ(static_cast<MockFactory*>(created_factory1)->GetName(), 
"Factory1");
+    EXPECT_EQ(static_cast<MockFactory*>(created_factory2)->GetName(), 
"Factory2");
+}
+
+TEST_F(FactoryCreatorTest, GetRegisteredType) {
+    auto factory1 = new MockFactory("Factory1");
+    auto factory2 = new MockFactory("Factory2");
+
+    factory_creator_->Register("type1", factory1);
+    factory_creator_->Register("type2", factory2);
+
+    std::vector<std::string> registered_types = 
factory_creator_->GetRegisteredType();
+    EXPECT_EQ(registered_types.size(), 2);
+    EXPECT_NE(std::find(registered_types.begin(), registered_types.end(), 
"type1"),
+              registered_types.end());
+    EXPECT_NE(std::find(registered_types.begin(), registered_types.end(), 
"type2"),
+              registered_types.end());
+}
+
+TEST_F(FactoryCreatorTest, CreateNonExistentFactory) {
+    Factory* created_factory = factory_creator_->Create("nonexistent");
+    EXPECT_EQ(created_factory, nullptr);
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/factories/io_hook_test.cpp 
b/src/paimon/common/factories/io_hook_test.cpp
new file mode 100644
index 0000000..9bbb1b3
--- /dev/null
+++ b/src/paimon/common/factories/io_hook_test.cpp
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/factories/io_hook.h"
+
+#include <stdexcept>
+
+#include "gtest/gtest.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(IOHookTest, TestReturnErrorMode) {
+    auto hook = IOHook::GetInstance();
+    hook->Reset(0, IOHook::Mode::RETURN_ERROR);
+    ASSERT_NOK(hook->Try("path"));
+    ASSERT_NOK(hook->Try("path"));
+    ASSERT_EQ(2, hook->IOCount());
+    hook->Clear();
+}
+
+TEST(IOHookTest, TestSilentMode) {
+    auto hook = IOHook::GetInstance();
+    hook->Reset(0, IOHook::Mode::SILENT);
+    ASSERT_OK(hook->Try("path"));
+    ASSERT_OK(hook->Try("path"));
+    ASSERT_EQ(2, hook->IOCount());
+    hook->Clear();
+}
+
+TEST(IOHookTest, TestSingleton) {
+    auto hook = IOHook::GetInstance();
+    auto hook2 = IOHook::GetInstance();
+    ASSERT_EQ(hook, hook2);
+    hook->Clear();
+}
+
+TEST(IOHookTest, TestThrowExceptionMode) {
+    auto hook = IOHook::GetInstance();
+    hook->Reset(0, IOHook::Mode::THROW_EXCEPTION);
+    auto Try = [hook]() {
+        auto s = hook->Try("path");
+        (void)s;
+    };
+    EXPECT_THROW(Try(), std::runtime_error);
+    EXPECT_THROW(Try(), std::runtime_error);
+    ASSERT_EQ(2, hook->IOCount());
+    hook->Clear();
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/metrics/histogram.cpp 
b/src/paimon/common/metrics/histogram.cpp
new file mode 100644
index 0000000..73ccbae
--- /dev/null
+++ b/src/paimon/common/metrics/histogram.cpp
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/metrics/histogram.h"
+
+#include <algorithm>
+#include <cmath>
+
+namespace paimon {
+
+HistogramImpl::HistogramImpl() : bucket_counts_(BucketLimits().size(), 0) {}
+
+void HistogramImpl::Add(double value) {
+    if (std::isnan(value)) {
+        return;
+    }
+    std::lock_guard<std::mutex> guard(mu_);
+    const auto& limits = BucketLimits();
+    size_t idx = BucketIndex(value, limits);
+    bucket_counts_[idx] += 1;
+    num_ += 1;
+    sum_ += value;
+    sum_squares_ += value * value;
+    if (value < min_) {
+        min_ = value;
+    }
+    if (value > max_) {
+        max_ = value;
+    }
+}
+
+HistogramStats HistogramImpl::GetStats() const {
+    return ToStats(GetSnapshot());
+}
+
+HistogramImpl::Snapshot HistogramImpl::GetSnapshot() const {
+    std::lock_guard<std::mutex> guard(mu_);
+    Snapshot s;
+    s.count = num_;
+    s.sum = sum_;
+    s.sum_squares = sum_squares_;
+    s.min = (num_ == 0) ? 0 : min_;
+    s.max = (num_ == 0) ? 0 : max_;
+    s.bucket_counts = bucket_counts_;
+    return s;
+}
+
+void HistogramImpl::Merge(const Histogram& other) {
+    const auto* other_impl = dynamic_cast<const HistogramImpl*>(&other);
+    if (!other_impl) {
+        return;
+    }
+    MergeFrom(*other_impl);
+}
+
+void HistogramImpl::MergeFrom(const HistogramImpl& other) {
+    Snapshot other_snapshot = other.GetSnapshot();
+    std::lock_guard<std::mutex> guard(mu_);
+    if (bucket_counts_.size() != other_snapshot.bucket_counts.size()) {
+        // Should never happen unless bucket config changes.
+        return;
+    }
+    if (other_snapshot.count == 0) {
+        return;
+    }
+    for (size_t i = 0; i < bucket_counts_.size(); ++i) {
+        bucket_counts_[i] += other_snapshot.bucket_counts[i];
+    }
+    if (num_ == 0) {
+        min_ = other_snapshot.min;
+        max_ = other_snapshot.max;
+    } else {
+        min_ = std::min(min_, other_snapshot.min);
+        max_ = std::max(max_, other_snapshot.max);
+    }
+    num_ += other_snapshot.count;
+    sum_ += other_snapshot.sum;
+    sum_squares_ += other_snapshot.sum_squares;
+}
+
+std::shared_ptr<Histogram> HistogramImpl::Clone() const {
+    Snapshot s = GetSnapshot();
+    auto cloned = std::make_shared<HistogramImpl>();
+    std::lock_guard<std::mutex> guard(cloned->mu_);
+    cloned->num_ = s.count;
+    cloned->sum_ = s.sum;
+    cloned->sum_squares_ = s.sum_squares;
+    cloned->min_ = (s.count == 0) ? std::numeric_limits<double>::infinity() : 
s.min;
+    cloned->max_ = s.max;
+    cloned->bucket_counts_ = std::move(s.bucket_counts);
+    return cloned;
+}
+
+HistogramStats HistogramImpl::ToStats(const Snapshot& s) {
+    HistogramStats stats;
+    stats.count = s.count;
+    stats.sum = s.sum;
+    stats.min = s.min;
+    stats.max = s.max;
+    stats.average = (s.count == 0) ? 0 : (s.sum / 
static_cast<double>(s.count));
+    stats.p50 = EstimatePercentile(s, 0.50);
+    stats.p90 = EstimatePercentile(s, 0.90);
+    stats.p95 = EstimatePercentile(s, 0.95);
+    stats.p99 = EstimatePercentile(s, 0.99);
+    stats.p999 = EstimatePercentile(s, 0.999);
+    stats.stddev = EstimateStddev(s);
+    return stats;
+}
+
+const std::vector<double>& HistogramImpl::BucketLimits() {
+    static const std::vector<double> limits = []() {
+        // Keep bucket generation aligned with RocksDB's HistogramBucketMapper
+        // (see rocksdb/monitoring/histogram.cc).
+        //
+        // The mapper starts from {1,2}, then grows by *1.5 and rounds each 
bucket boundary to
+        // be human-readable by keeping the two most significant digits.
+        std::vector<double> v;
+        v.reserve(256);
+        v.push_back(1);
+        v.push_back(2);
+
+        double bucket_val = v.back();
+        const auto max_u64 = 
static_cast<double>(std::numeric_limits<uint64_t>::max());
+        while ((bucket_val = 1.5 * bucket_val) <= max_u64) {
+            auto rounded = static_cast<uint64_t>(bucket_val);
+            // Keep two most significant digits (e.g., 172 -> 170).
+            uint64_t pow_of_ten = 1;
+            while (rounded / 10 > 10) {
+                rounded /= 10;
+                pow_of_ten *= 10;
+            }
+            rounded *= pow_of_ten;
+            v.push_back(static_cast<double>(rounded));
+        }
+        return v;
+    }();
+    return limits;
+}
+
+size_t HistogramImpl::BucketIndex(double value, const std::vector<double>& 
limits) {
+    if (value <= limits.front()) {
+        return 0;
+    }
+    auto it = std::lower_bound(limits.begin(), limits.end(), value);
+    if (it == limits.end()) {
+        return limits.size() - 1;
+    }
+    return static_cast<size_t>(std::distance(limits.begin(), it));
+}
+
+double HistogramImpl::EstimatePercentile(const Snapshot& s, double p) {
+    if (s.count == 0) {
+        return 0;
+    }
+    if (p <= 0) {
+        return s.min;
+    }
+    if (p >= 1) {
+        return s.max;
+    }
+    const auto& limits = BucketLimits();
+    const auto rank = static_cast<uint64_t>(std::ceil(p * 
static_cast<double>(s.count)));
+    uint64_t cum = 0;
+    for (size_t i = 0; i < s.bucket_counts.size(); ++i) {
+        uint64_t bcnt = s.bucket_counts[i];
+        if (bcnt == 0) {
+            continue;
+        }
+        uint64_t prev = cum;
+        cum += bcnt;
+        if (cum < rank) {
+            continue;
+        }
+
+        // Interpolate inside bucket: estimate that samples are uniformly 
distributed
+        // between bucket lower and upper bounds.
+        double upper = (i < limits.size()) ? limits[i] : s.max;
+        double lower = (i == 0) ? std::min(s.min, 0.0) : limits[i - 1];
+        if (std::isinf(upper)) {
+            return s.max;
+        }
+        const double pos_in_bucket = static_cast<double>(rank - prev) / 
static_cast<double>(bcnt);
+        double est = lower + (upper - lower) * pos_in_bucket;
+        if (est < s.min) {
+            est = s.min;
+        }
+        if (est > s.max) {
+            est = s.max;
+        }
+        return est;
+    }
+    return s.max;
+}
+
+double HistogramImpl::EstimateStddev(const Snapshot& s) {
+    if (s.count == 0) {
+        return 0;
+    }
+    const auto n = static_cast<double>(s.count);
+    const double mean = s.sum / n;
+    const double ex2 = s.sum_squares / n;
+    const double var = std::max(0.0, ex2 - mean * mean);
+    return std::sqrt(var);
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/metrics/histogram.h 
b/src/paimon/common/metrics/histogram.h
new file mode 100644
index 0000000..aec6178
--- /dev/null
+++ b/src/paimon/common/metrics/histogram.h
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <limits>
+#include <memory>
+#include <mutex>
+#include <vector>
+
+#include "paimon/metrics.h"
+
+namespace paimon {
+
+// Internal histogram interface used by MetricsImpl.
+//
+// Note: this interface is intentionally NOT exported.
+class Histogram {
+ public:
+    virtual ~Histogram() = default;
+
+    virtual void Add(double value) = 0;
+    virtual HistogramStats GetStats() const = 0;
+    virtual void Merge(const Histogram& other) = 0;
+    virtual std::shared_ptr<Histogram> Clone() const = 0;
+};
+
+// Internal histogram implementation.
+//
+// The bucket boundaries are aligned with RocksDB's HistogramBucketMapper
+// (rocksdb/monitoring/histogram.cc).
+class HistogramImpl : public Histogram {
+ public:
+    HistogramImpl();
+
+    void Add(double value) override;
+
+    HistogramStats GetStats() const override;
+
+    void Merge(const Histogram& other) override;
+
+    std::shared_ptr<Histogram> Clone() const override;
+
+    struct Snapshot {
+        uint64_t count = 0;
+        double sum = 0;
+        double sum_squares = 0;
+        double min = 0;
+        double max = 0;
+        std::vector<uint64_t> bucket_counts;
+    };
+
+    Snapshot GetSnapshot() const;
+
+    void MergeFrom(const HistogramImpl& other);
+
+    static HistogramStats ToStats(const Snapshot& s);
+
+ private:
+    static const std::vector<double>& BucketLimits();
+    static size_t BucketIndex(double value, const std::vector<double>& limits);
+    static double EstimatePercentile(const Snapshot& s, double p);
+    static double EstimateStddev(const Snapshot& s);
+
+ private:
+    mutable std::mutex mu_;
+    uint64_t num_ = 0;
+    double sum_ = 0;
+    double sum_squares_ = 0;
+    double min_ = std::numeric_limits<double>::infinity();
+    double max_ = 0;
+    std::vector<uint64_t> bucket_counts_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/metrics/histogram_test.cpp 
b/src/paimon/common/metrics/histogram_test.cpp
new file mode 100644
index 0000000..2833480
--- /dev/null
+++ b/src/paimon/common/metrics/histogram_test.cpp
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/metrics/histogram.h"
+
+#include <algorithm>
+#include <cmath>
+#include <limits>
+#include <random>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/common/metrics/metrics_impl.h"
+#include "paimon/testing/utils/testharness.h"
+#include "rapidjson/document.h"
+
+namespace paimon::test {
+
+TEST(HistogramImplTest, TestBasicStats) {
+    HistogramImpl h;
+    h.Add(1);
+    h.Add(2);
+    h.Add(3);
+
+    HistogramStats s = h.GetStats();
+    EXPECT_EQ(s.count, 3);
+    EXPECT_DOUBLE_EQ(s.min, 1);
+    EXPECT_DOUBLE_EQ(s.max, 3);
+    EXPECT_NEAR(s.average, 2.0, 1e-12);
+    EXPECT_NEAR(s.stddev, std::sqrt(2.0 / 3.0), 1e-12);
+    EXPECT_DOUBLE_EQ(s.p50, 2);
+    EXPECT_DOUBLE_EQ(s.p90, 3);
+    EXPECT_DOUBLE_EQ(s.p95, 3);
+    EXPECT_DOUBLE_EQ(s.p99, 3);
+    EXPECT_DOUBLE_EQ(s.p999, 3);
+}
+
+TEST(HistogramImplTest, TestLargeDatasetExactMoments) {
+    HistogramImpl h;
+
+    std::mt19937 rng(12345);
+    std::uniform_int_distribution<int> dist(-100, 5000);
+
+    const int32_t n = 20000;
+    double sum = 0;
+    double sum_squares = 0;
+    double minv = std::numeric_limits<double>::infinity();
+    double maxv = -std::numeric_limits<double>::infinity();
+
+    for (int32_t i = 0; i < n; ++i) {
+        const auto v = static_cast<double>(dist(rng));
+        h.Add(v);
+        sum += v;
+        sum_squares += v * v;
+        minv = std::min(minv, v);
+        maxv = std::max(maxv, v);
+    }
+
+    HistogramStats s = h.GetStats();
+    EXPECT_EQ(s.count, static_cast<uint64_t>(n));
+    EXPECT_DOUBLE_EQ(s.sum, sum);
+    EXPECT_DOUBLE_EQ(s.min, minv);
+    EXPECT_DOUBLE_EQ(s.max, maxv);
+
+    const double mean = sum / static_cast<double>(n);
+    const double ex2 = sum_squares / static_cast<double>(n);
+    const double var = std::max(0.0, ex2 - mean * mean);
+    const double stddev = std::sqrt(var);
+    EXPECT_DOUBLE_EQ(s.average, mean);
+    EXPECT_DOUBLE_EQ(s.stddev, stddev);
+
+    // Percentiles should be inside [min, max] and monotonically increasing.
+    EXPECT_GE(s.p50, s.min);
+    EXPECT_LE(s.p50, s.max);
+    EXPECT_GE(s.p90, s.p50);
+    EXPECT_GE(s.p95, s.p90);
+    EXPECT_GE(s.p99, s.p95);
+    EXPECT_GE(s.p999, s.p99);
+    EXPECT_LE(s.p999, s.max);
+}
+
+TEST(HistogramImplTest, TestMergeMatchesSingleHistogram) {
+    std::mt19937 rng(7);
+    std::uniform_int_distribution<int> dist(0, 100000);
+    const int32_t n = 50000;
+
+    std::vector<double> values;
+    values.reserve(n);
+    for (int32_t i = 0; i < n; ++i) {
+        values.push_back(static_cast<double>(dist(rng)));
+    }
+
+    HistogramImpl left;
+    HistogramImpl right;
+    HistogramImpl all;
+
+    for (int32_t i = 0; i < n; ++i) {
+        all.Add(values[i]);
+        if (i % 2 == 0) {
+            left.Add(values[i]);
+        } else {
+            right.Add(values[i]);
+        }
+    }
+    left.Merge(right);
+
+    HistogramStats merged = left.GetStats();
+    HistogramStats single = all.GetStats();
+
+    EXPECT_EQ(merged.count, single.count);
+    EXPECT_DOUBLE_EQ(merged.sum, single.sum);
+    EXPECT_DOUBLE_EQ(merged.min, single.min);
+    EXPECT_DOUBLE_EQ(merged.max, single.max);
+    EXPECT_DOUBLE_EQ(merged.average, single.average);
+    EXPECT_DOUBLE_EQ(merged.stddev, single.stddev);
+    EXPECT_DOUBLE_EQ(merged.p50, single.p50);
+    EXPECT_DOUBLE_EQ(merged.p90, single.p90);
+    EXPECT_DOUBLE_EQ(merged.p95, single.p95);
+    EXPECT_DOUBLE_EQ(merged.p99, single.p99);
+    EXPECT_DOUBLE_EQ(merged.p999, single.p999);
+}
+
+TEST(HistogramImplTest, TestCloneConsistencyAndIndependence) {
+    HistogramImpl h;
+    // Include a mix of values across buckets; NaN should be ignored.
+    const std::vector<double> values = {1,   2,   3,  10,
+                                        100, 0.5, -5, 
std::numeric_limits<double>::quiet_NaN()};
+    for (double v : values) {
+        h.Add(v);
+    }
+
+    auto cloned_base = h.Clone();
+    auto cloned = std::dynamic_pointer_cast<HistogramImpl>(cloned_base);
+    ASSERT_TRUE(cloned != nullptr);
+
+    // Capture original state after Clone() for easier post-mortem inspection.
+    // Clone() should not mutate the source.
+    const auto after_snapshot = h.GetSnapshot();
+    const auto after_stats = h.GetStats();
+
+    const auto cloned_snapshot = cloned->GetSnapshot();
+    EXPECT_EQ(cloned_snapshot.count, after_snapshot.count);
+    EXPECT_DOUBLE_EQ(cloned_snapshot.sum, after_snapshot.sum);
+    EXPECT_DOUBLE_EQ(cloned_snapshot.sum_squares, after_snapshot.sum_squares);
+    EXPECT_DOUBLE_EQ(cloned_snapshot.min, after_snapshot.min);
+    EXPECT_DOUBLE_EQ(cloned_snapshot.max, after_snapshot.max);
+    EXPECT_EQ(cloned_snapshot.bucket_counts, after_snapshot.bucket_counts);
+
+    const auto cloned_stats = cloned->GetStats();
+    EXPECT_EQ(cloned_stats.count, after_stats.count);
+    EXPECT_DOUBLE_EQ(cloned_stats.sum, after_stats.sum);
+    EXPECT_DOUBLE_EQ(cloned_stats.min, after_stats.min);
+    EXPECT_DOUBLE_EQ(cloned_stats.max, after_stats.max);
+    EXPECT_DOUBLE_EQ(cloned_stats.average, after_stats.average);
+    EXPECT_DOUBLE_EQ(cloned_stats.stddev, after_stats.stddev);
+    EXPECT_DOUBLE_EQ(cloned_stats.p50, after_stats.p50);
+    EXPECT_DOUBLE_EQ(cloned_stats.p90, after_stats.p90);
+    EXPECT_DOUBLE_EQ(cloned_stats.p95, after_stats.p95);
+    EXPECT_DOUBLE_EQ(cloned_stats.p99, after_stats.p99);
+    EXPECT_DOUBLE_EQ(cloned_stats.p999, after_stats.p999);
+
+    // Mutating original should not affect cloned.
+    h.Add(42);
+    EXPECT_EQ(cloned->GetSnapshot().count, after_snapshot.count);
+    EXPECT_EQ(h.GetSnapshot().count, after_snapshot.count + 1);
+
+    // Mutating cloned should not affect original.
+    cloned->Add(84);
+    EXPECT_EQ(cloned->GetSnapshot().count, after_snapshot.count + 1);
+    EXPECT_EQ(h.GetSnapshot().count, after_snapshot.count + 1);
+}
+
+TEST(MetricsImplHistogramTest, TestMergeAndOverwrite) {
+    auto metrics1 = std::make_shared<MetricsImpl>();
+    metrics1->ObserveHistogram("h", 1);
+    metrics1->ObserveHistogram("h", 2);
+
+    auto metrics2 = std::make_shared<MetricsImpl>();
+    metrics2->ObserveHistogram("h", 3);
+    metrics2->ObserveHistogram("h", 4);
+
+    metrics1->Merge(metrics2);
+
+    ASSERT_OK_AND_ASSIGN(HistogramStats s, metrics1->GetHistogramStats("h"));
+    EXPECT_EQ(s.count, 4);
+    EXPECT_DOUBLE_EQ(s.min, 1);
+    EXPECT_DOUBLE_EQ(s.max, 4);
+    EXPECT_NEAR(s.average, 2.5, 1e-12);
+    EXPECT_DOUBLE_EQ(s.p50, 2);
+    EXPECT_DOUBLE_EQ(s.p99, 4);
+
+    auto metrics3 = std::make_shared<MetricsImpl>();
+    metrics3->ObserveHistogram("h2", 100);
+    metrics1->Overwrite(metrics3);
+    ASSERT_NOK_WITH_MSG(metrics1->GetHistogramStats("h"), "Key error: 
histogram 'h' not found");
+    ASSERT_OK_AND_ASSIGN(HistogramStats s2, metrics1->GetHistogramStats("h2"));
+    EXPECT_EQ(s2.count, 1);
+    EXPECT_DOUBLE_EQ(s2.min, 100);
+    EXPECT_DOUBLE_EQ(s2.max, 100);
+    EXPECT_DOUBLE_EQ(s2.stddev, 0);
+}
+
+TEST(MetricsImplHistogramTest, TestToStringWithHistogram) {
+    auto metrics = std::make_shared<MetricsImpl>();
+    metrics->SetCounter("k1", 1);
+    metrics->ObserveHistogram("h", 1);
+    metrics->ObserveHistogram("h", 2);
+
+    rapidjson::Document doc;
+    doc.Parse(metrics->ToString().c_str());
+    ASSERT_TRUE(doc.IsObject());
+    ASSERT_TRUE(doc.HasMember("k1"));
+    EXPECT_EQ(doc["k1"].GetUint64(), 1);
+
+    ASSERT_TRUE(doc.HasMember("h.count"));
+    EXPECT_EQ(doc["h.count"].GetUint64(), 2);
+    ASSERT_TRUE(doc.HasMember("h.min"));
+    EXPECT_TRUE(doc["h.min"].IsNumber());
+    ASSERT_TRUE(doc.HasMember("h.p99"));
+    EXPECT_TRUE(doc["h.p99"].IsNumber());
+
+    ASSERT_TRUE(doc.HasMember("h.p99.9"));
+    EXPECT_TRUE(doc["h.p99.9"].IsNumber());
+    ASSERT_TRUE(doc.HasMember("h.stddev"));
+    EXPECT_TRUE(doc["h.stddev"].IsNumber());
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/metrics/histogram_windowing.cpp 
b/src/paimon/common/metrics/histogram_windowing.cpp
new file mode 100644
index 0000000..9b0565f
--- /dev/null
+++ b/src/paimon/common/metrics/histogram_windowing.cpp
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/metrics/histogram_windowing.h"
+
+#include <chrono>
+
+namespace paimon {
+
+uint64_t HistogramWindowingImpl::NowMicros() {
+    return std::chrono::duration_cast<std::chrono::microseconds>(
+               std::chrono::system_clock::now().time_since_epoch())
+        .count();
+}
+
+uint64_t HistogramWindowingImpl::AlignWindowStartMicros(uint64_t micros) const 
{
+    if (micros_per_window_ == 0) {
+        return micros;
+    }
+    return micros - (micros % micros_per_window_);
+}
+
+void HistogramWindowingImpl::Init() {
+    std::lock_guard<std::mutex> guard(mu_);
+    window_start_micros_.assign(num_windows_, std::nullopt);
+    histograms_.assign(num_windows_, nullptr);
+    const uint64_t now = AlignWindowStartMicros(NowMicros());
+    ResetLocked(now);
+}
+
+void HistogramWindowingImpl::ResetLocked(uint64_t aligned_now) {
+    if (num_windows_ == 0) {
+        current_index_ = 0;
+        current_window_start_micros_ = aligned_now;
+        current_window_num_ = 0;
+        return;
+    }
+    current_index_ = 0;
+    current_window_start_micros_ = aligned_now;
+    current_window_num_ = 0;
+    for (size_t i = 0; i < num_windows_; ++i) {
+        window_start_micros_[i] = std::nullopt;
+        histograms_[i].reset();
+    }
+    window_start_micros_[current_index_] = current_window_start_micros_;
+    histograms_[current_index_] = std::make_shared<HistogramImpl>();
+}
+
+void HistogramWindowingImpl::AdvanceLocked(uint64_t aligned_now) {
+    if (micros_per_window_ == 0 || num_windows_ == 0) {
+        return;
+    }
+
+    if (aligned_now <= current_window_start_micros_) {
+        return;
+    }
+
+    const uint64_t max_span = micros_per_window_ * num_windows_;
+    if (aligned_now - current_window_start_micros_ >= max_span) {
+        ResetLocked(aligned_now);
+        return;
+    }
+
+    // Advance at most num_windows_ steps, but only when current window has 
enough samples.
+    while (aligned_now > current_window_start_micros_ &&
+           (aligned_now - current_window_start_micros_) >= micros_per_window_ 
&&
+           current_window_num_ >= min_num_per_window_) {
+        current_window_start_micros_ += micros_per_window_;
+        current_index_ = (current_index_ + 1) % 
static_cast<size_t>(num_windows_);
+        current_window_num_ = 0;
+        window_start_micros_[current_index_] = current_window_start_micros_;
+        histograms_[current_index_] = std::make_shared<HistogramImpl>();
+    }
+}
+
+size_t HistogramWindowingImpl::IndexForStartLocked(uint64_t aligned_start) 
const {
+    // Find the slot with the same start time.
+    for (size_t i = 0; i < window_start_micros_.size(); ++i) {
+        if (window_start_micros_[i].has_value() &&
+            window_start_micros_[i].value() == aligned_start) {
+            return i;
+        }
+    }
+    // Not found; map by offset from current window.
+    if (micros_per_window_ == 0) {
+        return current_index_;
+    }
+    if (aligned_start > current_window_start_micros_) {
+        return current_index_;
+    }
+    const uint64_t delta = current_window_start_micros_ - aligned_start;
+    const uint64_t steps = delta / micros_per_window_;
+    const size_t idx = (current_index_ + num_windows_ - (steps % 
num_windows_)) %
+                       static_cast<size_t>(num_windows_);
+    return idx;
+}
+
+std::shared_ptr<HistogramImpl> 
HistogramWindowingImpl::GetOrCreateForStartLocked(
+    uint64_t aligned_start) {
+    if (num_windows_ == 0) {
+        return nullptr;
+    }
+    size_t idx = IndexForStartLocked(aligned_start);
+    window_start_micros_[idx] = aligned_start;
+    if (!histograms_[idx]) {
+        histograms_[idx] = std::make_shared<HistogramImpl>();
+    }
+    return histograms_[idx];
+}
+
+void HistogramWindowingImpl::Add(double value) {
+    if (num_windows_ == 0) {
+        return;
+    }
+    std::shared_ptr<HistogramImpl> hist;
+    {
+        std::lock_guard<std::mutex> guard(mu_);
+        const uint64_t now = NowMicros();
+        const uint64_t aligned_now = AlignWindowStartMicros(now);
+        // Ignore if clock moves backwards.
+        if (aligned_now < current_window_start_micros_) {
+            return;
+        }
+        AdvanceLocked(aligned_now);
+        if (!histograms_[current_index_]) {
+            histograms_[current_index_] = std::make_shared<HistogramImpl>();
+            window_start_micros_[current_index_] = 
current_window_start_micros_;
+        }
+        hist = histograms_[current_index_];
+        ++current_window_num_;
+    }
+    if (hist) {
+        hist->Add(value);
+    }
+}
+
+HistogramStats HistogramWindowingImpl::GetStats() const {
+    if (num_windows_ == 0) {
+        return HistogramStats{};
+    }
+    std::vector<std::pair<uint64_t, std::shared_ptr<HistogramImpl>>> hists;
+    uint64_t aligned_now = 0;
+    {
+        std::lock_guard<std::mutex> guard(mu_);
+        aligned_now = AlignWindowStartMicros(NowMicros());
+        // Note: do NOT force AdvanceLocked here, to keep semantics consistent 
with RocksDB's
+        // min_num_per_window behavior (windows advance upon Add() after 
enough samples).
+        hists.reserve(histograms_.size());
+        for (size_t i = 0; i < histograms_.size(); ++i) {
+            if (!window_start_micros_[i].has_value() || !histograms_[i]) {
+                continue;
+            }
+            const uint64_t start = window_start_micros_[i].value();
+            hists.emplace_back(start, histograms_[i]);
+        }
+    }
+
+    HistogramImpl aggregated;
+    if (micros_per_window_ == 0 || num_windows_ == 0) {
+        for (const auto& kv : hists) {
+            aggregated.MergeFrom(*kv.second);
+        }
+        return aggregated.GetStats();
+    }
+
+    const uint64_t max_span = micros_per_window_ * num_windows_;
+    for (const auto& kv : hists) {
+        const auto start = kv.first;
+        const auto& h = kv.second;
+        if (aligned_now >= start && (aligned_now - start) < max_span) {
+            aggregated.MergeFrom(*h);
+        }
+    }
+    return aggregated.GetStats();
+}
+
+void HistogramWindowingImpl::Merge(const Histogram& other) {
+    const auto* other_w = dynamic_cast<const HistogramWindowingImpl*>(&other);
+    if (!other_w) {
+        const auto* other_impl = dynamic_cast<const HistogramImpl*>(&other);
+        if (!other_impl) {
+            return;
+        }
+        // Merge a plain histogram into current window.
+        std::shared_ptr<HistogramImpl> target;
+        {
+            std::lock_guard<std::mutex> guard(mu_);
+            const uint64_t aligned_now = AlignWindowStartMicros(NowMicros());
+            if (aligned_now < current_window_start_micros_) {
+                return;
+            }
+            AdvanceLocked(aligned_now);
+            target = GetOrCreateForStartLocked(current_window_start_micros_);
+        }
+        if (target) {
+            target->MergeFrom(*other_impl);
+        }
+        return;
+    }
+
+    // If windowing parameters mismatch, fall back to merging all other 
windows into current.
+    if (num_windows_ != other_w->num_windows_ ||
+        micros_per_window_ != other_w->micros_per_window_ ||
+        min_num_per_window_ != other_w->min_num_per_window_) {
+        std::vector<std::shared_ptr<HistogramImpl>> other_only;
+        {
+            std::lock_guard<std::mutex> guard(other_w->mu_);
+            other_only.reserve(other_w->histograms_.size());
+            for (const auto& histogram : other_w->histograms_) {
+                if (histogram) {
+                    other_only.push_back(histogram);
+                }
+            }
+        }
+        std::shared_ptr<HistogramImpl> target;
+        {
+            std::lock_guard<std::mutex> guard(mu_);
+            const uint64_t aligned_now = AlignWindowStartMicros(NowMicros());
+            if (aligned_now < current_window_start_micros_) {
+                return;
+            }
+            AdvanceLocked(aligned_now);
+            target = GetOrCreateForStartLocked(current_window_start_micros_);
+        }
+        if (target) {
+            for (const auto& h : other_only) {
+                if (h) {
+                    target->MergeFrom(*h);
+                }
+            }
+        }
+        return;
+    }
+
+    // Merge by window start timestamps.
+    std::vector<std::pair<uint64_t, std::shared_ptr<HistogramImpl>>> 
other_hists;
+    {
+        std::lock_guard<std::mutex> guard(other_w->mu_);
+        other_hists.reserve(other_w->histograms_.size());
+        for (size_t i = 0; i < other_w->histograms_.size(); ++i) {
+            if (!other_w->window_start_micros_[i].has_value() || 
!other_w->histograms_[i]) {
+                continue;
+            }
+            other_hists.emplace_back(other_w->window_start_micros_[i].value(),
+                                     other_w->histograms_[i]);
+        }
+    }
+
+    {
+        std::lock_guard<std::mutex> guard(mu_);
+        const uint64_t aligned_now = AlignWindowStartMicros(NowMicros());
+        if (aligned_now < current_window_start_micros_) {
+            return;
+        }
+        AdvanceLocked(aligned_now);
+        const uint64_t max_span = micros_per_window_ * num_windows_;
+        for (const auto& kv : other_hists) {
+            const auto start = kv.first;
+            const auto& hist = kv.second;
+            if (aligned_now >= start && (aligned_now - start) < max_span) {
+                auto target = GetOrCreateForStartLocked(start);
+                if (target && hist) {
+                    target->MergeFrom(*hist);
+                }
+            }
+        }
+    }
+}
+
+std::shared_ptr<Histogram> HistogramWindowingImpl::Clone() const {
+    auto cloned = std::make_shared<HistogramWindowingImpl>(num_windows_, 
micros_per_window_,
+                                                           
min_num_per_window_);
+    std::lock_guard<std::mutex> guard(mu_);
+    std::lock_guard<std::mutex> guard2(cloned->mu_);
+
+    cloned->current_window_start_micros_ = current_window_start_micros_;
+    cloned->current_window_num_ = current_window_num_;
+    cloned->current_index_ = current_index_;
+    cloned->window_start_micros_ = window_start_micros_;
+    cloned->histograms_.assign(num_windows_, nullptr);
+    for (size_t i = 0; i < histograms_.size() && i < 
cloned->histograms_.size(); ++i) {
+        const auto& histogram = histograms_[i];
+        if (histogram) {
+            cloned->histograms_[i] = 
std::dynamic_pointer_cast<HistogramImpl>(histogram->Clone());
+        }
+    }
+    return cloned;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/metrics/histogram_windowing.h 
b/src/paimon/common/metrics/histogram_windowing.h
new file mode 100644
index 0000000..bec9b60
--- /dev/null
+++ b/src/paimon/common/metrics/histogram_windowing.h
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <optional>
+#include <vector>
+
+#include "paimon/common/metrics/histogram.h"
+
+namespace paimon {
+
+// Window-based histogram implementation.
+//
+// Inspired by RocksDB's monitoring/histogram_windowing.h.
+// Maintains multiple time windows and aggregates stats across the recent ones.
+class HistogramWindowingImpl : public Histogram {
+ public:
+    // Default: 3 windows, 60s per window , 100 per window (paimon-java 
default)
+    HistogramWindowingImpl()
+        : num_windows_(3), micros_per_window_(60 * 1000ULL * 1000ULL), 
min_num_per_window_(100) {
+        Init();
+    }
+
+    HistogramWindowingImpl(uint64_t num_windows, uint64_t micros_per_window,
+                           uint64_t min_num_per_window)
+        : num_windows_(num_windows),
+          micros_per_window_(micros_per_window),
+          min_num_per_window_(min_num_per_window) {
+        Init();
+    }
+
+    void Add(double value) override;
+    HistogramStats GetStats() const override;
+    void Merge(const Histogram& other) override;
+    std::shared_ptr<Histogram> Clone() const override;
+
+ private:
+    static uint64_t NowMicros();
+    uint64_t AlignWindowStartMicros(uint64_t micros) const;
+
+    void Init();
+    void ResetLocked(uint64_t aligned_now);
+    void AdvanceLocked(uint64_t aligned_now);
+    size_t IndexForStartLocked(uint64_t aligned_start) const;
+    std::shared_ptr<HistogramImpl> GetOrCreateForStartLocked(uint64_t 
aligned_start);
+
+ private:
+    const uint64_t num_windows_;
+    const uint64_t micros_per_window_;
+    const uint64_t min_num_per_window_;
+
+    mutable std::mutex mu_;
+
+    // State protected by mu_.
+    uint64_t current_window_start_micros_ = 0;
+    uint64_t current_window_num_ = 0;
+    size_t current_index_ = 0;
+
+    // Per slot: start timestamp and histogram.
+    std::vector<std::optional<uint64_t>> window_start_micros_;
+    std::vector<std::shared_ptr<HistogramImpl>> histograms_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/metrics/histogram_windowing_test.cpp 
b/src/paimon/common/metrics/histogram_windowing_test.cpp
new file mode 100644
index 0000000..ea5083c
--- /dev/null
+++ b/src/paimon/common/metrics/histogram_windowing_test.cpp
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/metrics/histogram_windowing.h"
+
+#include <chrono>
+#include <thread>
+
+#include "gtest/gtest.h"
+
+namespace paimon::test {
+
+TEST(HistogramWindowingImplTest, TestAdvanceAndAggregateAcrossWindows) {
+    // Use a relatively large span to avoid flakiness around boundary 
(aligned_now - start == span).
+    HistogramWindowingImpl h(/*num_windows=*/4, /*micros_per_window=*/25000,
+                             /*min_num_per_window=*/1);
+    h.Add(1);
+    std::this_thread::sleep_for(std::chrono::milliseconds(26));
+    h.Add(2);
+
+    HistogramStats s = h.GetStats();
+    EXPECT_EQ(s.count, 2);
+    EXPECT_DOUBLE_EQ(s.min, 1);
+    EXPECT_DOUBLE_EQ(s.max, 2);
+    EXPECT_NEAR(s.average, 1.5, 1e-12);
+}
+
+TEST(HistogramWindowingImplTest, TestResetWhenBeyondMaxSpan) {
+    HistogramWindowingImpl h(/*num_windows=*/2, /*micros_per_window=*/1000,
+                             /*min_num_per_window=*/1);
+    h.Add(1);
+    std::this_thread::sleep_for(std::chrono::milliseconds(2));
+    h.Add(2);
+
+    // Sleep long enough so that aligned_now - current_window_start >=
+    // micros_per_window*num_windows.
+    std::this_thread::sleep_for(std::chrono::milliseconds(3));
+    h.Add(3);
+
+    HistogramStats s = h.GetStats();
+    EXPECT_EQ(s.count, 1);
+    EXPECT_DOUBLE_EQ(s.min, 3);
+    EXPECT_DOUBLE_EQ(s.max, 3);
+    EXPECT_NEAR(s.average, 3.0, 1e-12);
+}
+
+TEST(HistogramWindowingImplTest, TestMergeSameParams) {
+    // Use a large window to avoid relying on wall clock for this test.
+    HistogramWindowingImpl h1(/*num_windows=*/4, /*micros_per_window=*/1000ULL 
* 1000ULL,
+                              /*min_num_per_window=*/1);
+    HistogramWindowingImpl h2(/*num_windows=*/4, /*micros_per_window=*/1000ULL 
* 1000ULL,
+                              /*min_num_per_window=*/1);
+    h1.Add(1);
+    h1.Add(2);
+    h2.Add(3);
+
+    h1.Merge(h2);
+    HistogramStats s = h1.GetStats();
+    EXPECT_EQ(s.count, 3);
+    EXPECT_DOUBLE_EQ(s.sum, 6);
+    EXPECT_DOUBLE_EQ(s.min, 1);
+    EXPECT_DOUBLE_EQ(s.max, 3);
+}
+
+TEST(HistogramWindowingImplTest, TestMergeDifferentParamsFallback) {
+    HistogramWindowingImpl h1(/*num_windows=*/4, /*micros_per_window=*/1000ULL 
* 1000ULL,
+                              /*min_num_per_window=*/1);
+    HistogramWindowingImpl h2(/*num_windows=*/8, /*micros_per_window=*/1000ULL 
* 1000ULL,
+                              /*min_num_per_window=*/1);
+    h2.Add(10);
+    h2.Add(20);
+
+    h1.Merge(h2);
+    HistogramStats s = h1.GetStats();
+    EXPECT_EQ(s.count, 2);
+    EXPECT_DOUBLE_EQ(s.min, 10);
+    EXPECT_DOUBLE_EQ(s.max, 20);
+}
+
+TEST(HistogramWindowingImplTest, TestMinNumPerWindow100Case) {
+    // Validate min_num_per_window behavior:
+    // - window advancement is gated by sample count
+    // - if the histogram doesn't advance in time, it may get reset once 
beyond max span
+    // Use a larger window to reduce wall-clock sensitivity and avoid 
flakiness.
+    HistogramWindowingImpl h(/*num_windows=*/3, /*micros_per_window=*/50000,
+                             /*min_num_per_window=*/100);
+
+    // Fill current window but keep it below min_num.
+    for (int32_t i = 0; i < 99; ++i) {
+        h.Add(1);
+    }
+
+    // Cross at least one window.
+    std::this_thread::sleep_for(std::chrono::milliseconds(75));
+    h.Add(2);  // 100th sample; advancement check happens before this add.
+
+    HistogramStats s1 = h.GetStats();
+    EXPECT_EQ(s1.count, 100);
+    EXPECT_DOUBLE_EQ(s1.min, 1);
+    EXPECT_DOUBLE_EQ(s1.max, 2);
+
+    // Cross enough time so that aligned_now - current_window_start >= max_span
+    // (max_span = num_windows * micros_per_window = 150ms here).
+    std::this_thread::sleep_for(std::chrono::milliseconds(175));
+    h.Add(1000);
+
+    HistogramStats s2 = h.GetStats();
+    EXPECT_EQ(s2.count, 1);
+    EXPECT_DOUBLE_EQ(s2.min, 1000);
+    EXPECT_DOUBLE_EQ(s2.max, 1000);
+}
+
+TEST(HistogramWindowingImplTest, TestLargeDatasetInSingleWindow) {
+    // Use a large window to avoid relying on wall clock.
+    HistogramWindowingImpl h(/*num_windows=*/4, /*micros_per_window=*/60ULL * 
1000ULL * 1000ULL,
+                             /*min_num_per_window=*/1);
+
+    constexpr uint64_t n = 10000;
+    for (uint64_t i = 1; i <= n; ++i) {
+        h.Add(static_cast<double>(i));
+    }
+
+    HistogramStats s = h.GetStats();
+    EXPECT_EQ(s.count, n);
+    EXPECT_DOUBLE_EQ(s.min, 1);
+    EXPECT_DOUBLE_EQ(s.max, static_cast<double>(n));
+    EXPECT_DOUBLE_EQ(s.sum, static_cast<double>(n) * static_cast<double>(n + 
1) / 2.0);
+    EXPECT_NEAR(s.average, (static_cast<double>(n) + 1.0) / 2.0, 1e-12);
+}
+
+TEST(HistogramWindowingImplTest, TestCrossMaxSpanAfterAdvance) {
+    // Build multiple windows, then cross max_span so that all previous 
windows are dropped.
+    HistogramWindowingImpl h(/*num_windows=*/2, /*micros_per_window=*/2000,
+                             /*min_num_per_window=*/1);
+
+    h.Add(1);
+    std::this_thread::sleep_for(std::chrono::milliseconds(3));
+    h.Add(2);  // should advance to next window.
+
+    // Cross max_span (4ms) relative to the current window start.
+    // max_span = 4ms here.
+    std::this_thread::sleep_for(std::chrono::milliseconds(7));
+    h.Add(3);  // should reset.
+
+    HistogramStats s = h.GetStats();
+    EXPECT_EQ(s.count, 1);
+    EXPECT_DOUBLE_EQ(s.min, 3);
+    EXPECT_DOUBLE_EQ(s.max, 3);
+    EXPECT_DOUBLE_EQ(s.sum, 3);
+}
+
+TEST(HistogramWindowingImplTest, TestCloneConsistencyAndIndependence) {
+    // Use a large window to avoid relying on wall clock and window 
advancement.
+    HistogramWindowingImpl h(/*num_windows=*/4, /*micros_per_window=*/60ULL * 
1000ULL * 1000ULL,
+                             /*min_num_per_window=*/1);
+    h.Add(1);
+    h.Add(2);
+    h.Add(3);
+    h.Add(4);
+
+    const HistogramStats before = h.GetStats();
+
+    auto cloned_base = h.Clone();
+    auto cloned = 
std::dynamic_pointer_cast<HistogramWindowingImpl>(cloned_base);
+    ASSERT_TRUE(cloned != nullptr);
+
+    const HistogramStats cloned_before = cloned->GetStats();
+    EXPECT_EQ(cloned_before.count, before.count);
+    EXPECT_DOUBLE_EQ(cloned_before.sum, before.sum);
+    EXPECT_DOUBLE_EQ(cloned_before.min, before.min);
+    EXPECT_DOUBLE_EQ(cloned_before.max, before.max);
+    EXPECT_DOUBLE_EQ(cloned_before.average, before.average);
+    EXPECT_DOUBLE_EQ(cloned_before.stddev, before.stddev);
+    EXPECT_DOUBLE_EQ(cloned_before.p50, before.p50);
+    EXPECT_DOUBLE_EQ(cloned_before.p90, before.p90);
+    EXPECT_DOUBLE_EQ(cloned_before.p95, before.p95);
+    EXPECT_DOUBLE_EQ(cloned_before.p99, before.p99);
+    EXPECT_DOUBLE_EQ(cloned_before.p999, before.p999);
+
+    // Mutating original should not affect cloned.
+    h.Add(100);
+    const HistogramStats after_original = h.GetStats();
+    const HistogramStats after_clone = cloned->GetStats();
+    EXPECT_EQ(after_clone.count, cloned_before.count);
+    EXPECT_EQ(after_original.count, cloned_before.count + 1);
+
+    // Mutating cloned should not affect original.
+    cloned->Add(200);
+    const HistogramStats after_clone2 = cloned->GetStats();
+    const HistogramStats after_original2 = h.GetStats();
+    EXPECT_EQ(after_original2.count, after_original.count);
+    EXPECT_EQ(after_clone2.count, cloned_before.count + 1);
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/metrics/metrics_impl.cpp 
b/src/paimon/common/metrics/metrics_impl.cpp
new file mode 100644
index 0000000..87d8768
--- /dev/null
+++ b/src/paimon/common/metrics/metrics_impl.cpp
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/metrics/metrics_impl.h"
+
+#include <utility>
+#include <vector>
+
+#include "fmt/format.h"
+#include "paimon/common/metrics/histogram_windowing.h"
+#include "paimon/result.h"
+#include "rapidjson/document.h"
+#include "rapidjson/encodings.h"
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace paimon {
+void MetricsImpl::SetCounter(const std::string& metric_name, uint64_t 
metric_value) {
+    std::lock_guard<std::mutex> guard(counter_lock_);
+    counters_[metric_name] = metric_value;
+}
+
+Result<uint64_t> MetricsImpl::GetCounter(const std::string& metric_name) const 
{
+    std::lock_guard<std::mutex> guard(counter_lock_);
+    auto iter = counters_.find(metric_name);
+    if (iter != counters_.end()) {
+        return iter->second;
+    }
+    return Status::KeyError(fmt::format("metric '{}' not found", metric_name));
+}
+
+std::map<std::string, uint64_t> MetricsImpl::GetAllCounters() const {
+    std::lock_guard<std::mutex> guard(counter_lock_);
+    return counters_;
+}
+
+void MetricsImpl::ObserveHistogram(const std::string& metric_name, double 
value) {
+    std::shared_ptr<Histogram> histogram;
+    {
+        std::lock_guard<std::mutex> guard(histogram_lock_);
+        auto iter = histograms_.find(metric_name);
+        if (iter == histograms_.end()) {
+            histogram = std::make_shared<HistogramWindowingImpl>();
+            histograms_.emplace(metric_name, histogram);
+        } else {
+            histogram = iter->second;
+        }
+    }
+    histogram->Add(value);
+}
+
+Result<HistogramStats> MetricsImpl::GetHistogramStats(const std::string& 
metric_name) const {
+    std::shared_ptr<Histogram> histogram;
+    {
+        std::lock_guard<std::mutex> guard(histogram_lock_);
+        auto iter = histograms_.find(metric_name);
+        if (iter == histograms_.end()) {
+            return Status::KeyError(fmt::format("histogram '{}' not found", 
metric_name));
+        }
+        histogram = iter->second;
+    }
+    return histogram->GetStats();
+}
+
+std::map<std::string, HistogramStats> MetricsImpl::GetAllHistogramStats() 
const {
+    std::vector<std::pair<std::string, std::shared_ptr<Histogram>>> histograms;
+    {
+        std::lock_guard<std::mutex> guard(histogram_lock_);
+        histograms.reserve(histograms_.size());
+        for (const auto& kv : histograms_) {
+            histograms.push_back(kv);
+        }
+    }
+    std::map<std::string, HistogramStats> res;
+    for (const auto& [name, histogram] : histograms) {
+        res.emplace(name, histogram->GetStats());
+    }
+    return res;
+}
+
+void MetricsImpl::SetGauge(const std::string& metric_name, double value) {
+    std::lock_guard<std::mutex> lock(gauge_lock_);
+    gauges_[metric_name] = value;
+}
+
+Result<double> MetricsImpl::GetGauge(const std::string& metric_name) const {
+    std::lock_guard<std::mutex> lock(gauge_lock_);
+    auto it = gauges_.find(metric_name);
+    if (it != gauges_.end()) {
+        return it->second;
+    }
+    return Status::KeyError(fmt::format("metric '{}' not found", metric_name));
+}
+
+std::map<std::string, double> MetricsImpl::GetAllGauges() const {
+    std::lock_guard<std::mutex> lock(gauge_lock_);
+    return gauges_;
+}
+
+void MetricsImpl::Merge(const std::shared_ptr<Metrics>& other) {
+    if (other && this != other.get()) {
+        std::map<std::string, uint64_t> other_counters = 
other->GetAllCounters();
+        {
+            std::lock_guard<std::mutex> guard(counter_lock_);
+            for (const auto& kv : other_counters) {
+                auto iter = counters_.find(kv.first);
+                if (iter == counters_.end()) {
+                    counters_[kv.first] = kv.second;
+                } else {
+                    counters_[kv.first] += kv.second;
+                }
+            }
+        }
+        std::map<std::string, double> other_gauges = other->GetAllGauges();
+        {
+            std::lock_guard<std::mutex> guard(gauge_lock_);
+            for (const auto& kv : other_gauges) {
+                auto iter = gauges_.find(kv.first);
+                if (iter == gauges_.end()) {
+                    gauges_[kv.first] = kv.second;
+                } else {
+                    gauges_[kv.first] += kv.second;
+                }
+            }
+        }
+        auto other_impl = std::dynamic_pointer_cast<MetricsImpl>(other);
+        if (other_impl) {
+            std::vector<std::pair<std::string, std::shared_ptr<Histogram>>> 
other_histograms;
+            {
+                std::lock_guard<std::mutex> guard(other_impl->histogram_lock_);
+                other_histograms.reserve(other_impl->histograms_.size());
+                for (const auto& kv : other_impl->histograms_) {
+                    other_histograms.push_back(kv);
+                }
+            }
+
+            for (const auto& [name, other_hist] : other_histograms) {
+                std::shared_ptr<Histogram> this_hist;
+                {
+                    std::lock_guard<std::mutex> guard(histogram_lock_);
+                    auto iter = histograms_.find(name);
+                    if (iter == histograms_.end()) {
+                        this_hist = std::make_shared<HistogramWindowingImpl>();
+                        histograms_.emplace(name, this_hist);
+                    } else {
+                        this_hist = iter->second;
+                    }
+                }
+                this_hist->Merge(*other_hist);
+            }
+        }
+    }
+}
+
+void MetricsImpl::Overwrite(const std::shared_ptr<Metrics>& other) {
+    if (other && this != other.get()) {
+        std::map<std::string, uint64_t> other_counters = 
other->GetAllCounters();
+        {
+            std::lock_guard<std::mutex> guard(counter_lock_);
+            counters_.swap(other_counters);
+        }
+        std::map<std::string, double> other_gauges = other->GetAllGauges();
+        {
+            std::lock_guard<std::mutex> guard(gauge_lock_);
+            gauges_.swap(other_gauges);
+        }
+
+        auto other_impl = std::dynamic_pointer_cast<MetricsImpl>(other);
+        std::map<std::string, std::shared_ptr<Histogram>> new_histograms;
+        if (other_impl) {
+            std::vector<std::pair<std::string, std::shared_ptr<Histogram>>> 
other_histograms;
+            {
+                std::lock_guard<std::mutex> lock(other_impl->histogram_lock_);
+                other_histograms.reserve(other_impl->histograms_.size());
+                for (const auto& kv : other_impl->histograms_) {
+                    other_histograms.push_back(kv);
+                }
+            }
+            for (const auto& [name, histogram] : other_histograms) {
+                new_histograms.emplace(name, histogram->Clone());
+            }
+        }
+        {
+            std::lock_guard<std::mutex> lock(histogram_lock_);
+            histograms_.swap(new_histograms);
+        }
+    }
+}
+
+std::string MetricsImpl::ToString() const {
+    using RapidWriter =
+        rapidjson::Writer<rapidjson::StringBuffer, rapidjson::UTF8<>, 
rapidjson::ASCII<>>;
+    rapidjson::Document doc;
+    doc.SetObject();
+    rapidjson::Document::AllocatorType& allocator = doc.GetAllocator();
+
+    std::map<std::string, uint64_t> counters = GetAllCounters();
+    for (const auto& kv : counters) {
+        doc.AddMember(rapidjson::Value(kv.first, allocator), 
rapidjson::Value(kv.second),
+                      allocator);
+    }
+
+    // Flatten histogram stats into numeric key/value pairs to keep JSON 
schema simple.
+    // Example keys: "read.latency.p99", "read.latency.avg", 
"read.latency.count".
+    std::map<std::string, HistogramStats> hist_stats = GetAllHistogramStats();
+    for (const auto& [name, s] : hist_stats) {
+        doc.AddMember(rapidjson::Value(name + ".count", allocator), 
rapidjson::Value(s.count),
+                      allocator);
+        rapidjson::Value sum_val;
+        sum_val.SetDouble(s.sum);
+        doc.AddMember(rapidjson::Value(name + ".sum", allocator), sum_val, 
allocator);
+
+        rapidjson::Value min_val;
+        min_val.SetDouble(s.min);
+        doc.AddMember(rapidjson::Value(name + ".min", allocator), min_val, 
allocator);
+
+        rapidjson::Value max_val;
+        max_val.SetDouble(s.max);
+        doc.AddMember(rapidjson::Value(name + ".max", allocator), max_val, 
allocator);
+
+        rapidjson::Value avg_val;
+        avg_val.SetDouble(s.average);
+        doc.AddMember(rapidjson::Value(name + ".avg", allocator), avg_val, 
allocator);
+
+        rapidjson::Value p50_val;
+        p50_val.SetDouble(s.p50);
+        doc.AddMember(rapidjson::Value(name + ".p50", allocator), p50_val, 
allocator);
+
+        rapidjson::Value p90_val;
+        p90_val.SetDouble(s.p90);
+        doc.AddMember(rapidjson::Value(name + ".p90", allocator), p90_val, 
allocator);
+
+        rapidjson::Value p95_val;
+        p95_val.SetDouble(s.p95);
+        doc.AddMember(rapidjson::Value(name + ".p95", allocator), p95_val, 
allocator);
+
+        rapidjson::Value p99_val;
+        p99_val.SetDouble(s.p99);
+        doc.AddMember(rapidjson::Value(name + ".p99", allocator), p99_val, 
allocator);
+
+        rapidjson::Value p999_val;
+        p999_val.SetDouble(s.p999);
+        doc.AddMember(rapidjson::Value(name + ".p99.9", allocator), p999_val, 
allocator);
+
+        rapidjson::Value stddev_val;
+        stddev_val.SetDouble(s.stddev);
+        doc.AddMember(rapidjson::Value(name + ".stddev", allocator), 
stddev_val, allocator);
+    }
+
+    std::map<std::string, double> gauges = GetAllGauges();
+    for (const auto& kv : gauges) {
+        doc.AddMember(rapidjson::Value(kv.first, allocator), 
rapidjson::Value(kv.second),
+                      allocator);
+    }
+
+    rapidjson::StringBuffer s;
+    RapidWriter writer(s);
+    doc.Accept(writer);
+    return s.GetString();
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/metrics/metrics_impl.h 
b/src/paimon/common/metrics/metrics_impl.h
new file mode 100644
index 0000000..a36a59c
--- /dev/null
+++ b/src/paimon/common/metrics/metrics_impl.h
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <string>
+
+#include "paimon/metrics.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+class Histogram;
+
+class PAIMON_EXPORT MetricsImpl : public Metrics {
+ public:
+    ~MetricsImpl() override = default;
+
+    void SetCounter(const std::string& metric_name, uint64_t metric_value) 
override;
+    Result<uint64_t> GetCounter(const std::string& metric_name) const override;
+    std::map<std::string, uint64_t> GetAllCounters() const override;
+
+    void ObserveHistogram(const std::string& metric_name, double value) 
override;
+    Result<HistogramStats> GetHistogramStats(const std::string& metric_name) 
const override;
+    std::map<std::string, HistogramStats> GetAllHistogramStats() const 
override;
+
+    void SetGauge(const std::string& metric_name, double metric_value) 
override;
+    Result<double> GetGauge(const std::string& metric_name) const override;
+    std::map<std::string, double> GetAllGauges() const override;
+
+    void Merge(const std::shared_ptr<Metrics>& other) override;
+    std::string ToString() const override;
+    void Overwrite(const std::shared_ptr<Metrics>& metrics);
+
+    template <typename T>
+    static std::shared_ptr<Metrics> CollectReadMetrics(const T& readers) {
+        auto res_metrics = std::make_shared<MetricsImpl>();
+        for (const auto& reader : readers) {
+            if (!reader) {
+                continue;
+            }
+            auto metrics = reader->GetReaderMetrics();
+            if (metrics) {
+                res_metrics->Merge(metrics);
+            }
+        }
+        return res_metrics;
+    }
+
+ private:
+    mutable std::mutex counter_lock_;
+    std::map<std::string, uint64_t> counters_;
+
+    mutable std::mutex histogram_lock_;
+    std::map<std::string, std::shared_ptr<Histogram>> histograms_;
+
+    mutable std::mutex gauge_lock_;
+    std::map<std::string, double> gauges_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/metrics/metrics_impl_test.cpp 
b/src/paimon/common/metrics/metrics_impl_test.cpp
new file mode 100644
index 0000000..b481988
--- /dev/null
+++ b/src/paimon/common/metrics/metrics_impl_test.cpp
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/metrics/metrics_impl.h"
+
+#include "gtest/gtest.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(MetricsImplTest, TestSimple) {
+    auto metrics = std::make_shared<MetricsImpl>();
+    metrics->SetCounter("some_metric", 100);
+    metrics->SetCounter("some_metric_2", 150);
+    ASSERT_OK_AND_ASSIGN(uint64_t counter, metrics->GetCounter("some_metric"));
+    ASSERT_EQ(100, counter);
+    ASSERT_OK_AND_ASSIGN(counter, metrics->GetCounter("some_metric_2"));
+    ASSERT_EQ(150, counter);
+    auto other = std::make_shared<MetricsImpl>();
+    other->SetCounter("some_metric_2", 200);
+    other->SetCounter("some_metric_3", 300);
+    metrics->Merge(other);
+    ASSERT_OK_AND_ASSIGN(counter, metrics->GetCounter("some_metric"));
+    ASSERT_EQ(100, counter);
+    ASSERT_OK_AND_ASSIGN(counter, metrics->GetCounter("some_metric_2"));
+    ASSERT_EQ(350, counter);
+    ASSERT_OK_AND_ASSIGN(counter, metrics->GetCounter("some_metric_3"));
+    ASSERT_EQ(300, counter);
+    metrics->Overwrite(other);
+    ASSERT_OK_AND_ASSIGN(counter, metrics->GetCounter("some_metric_2"));
+    ASSERT_EQ(200, counter);
+    ASSERT_OK_AND_ASSIGN(counter, metrics->GetCounter("some_metric_3"));
+    ASSERT_EQ(300, counter);
+    // check some_metric is not exist after overwrite
+    ASSERT_NOK_WITH_MSG(metrics->GetCounter("some_metric"),
+                        "Key error: metric 'some_metric' not found");
+}
+
+TEST(MetricsImplTest, TestGaugeMergeAndOverwrite) {
+    auto metrics = std::make_shared<MetricsImpl>();
+    metrics->SetGauge("g1", 1.5);
+    metrics->SetGauge("g2", 2.0);
+
+    auto other = std::make_shared<MetricsImpl>();
+    other->SetGauge("g2", 3.25);
+    other->SetGauge("g3", 4.75);
+
+    metrics->Merge(other);
+
+    ASSERT_OK_AND_ASSIGN(double gauge, metrics->GetGauge("g1"));
+    EXPECT_DOUBLE_EQ(1.5, gauge);
+    ASSERT_OK_AND_ASSIGN(gauge, metrics->GetGauge("g2"));
+    EXPECT_DOUBLE_EQ(5.25, gauge);
+    ASSERT_OK_AND_ASSIGN(gauge, metrics->GetGauge("g3"));
+    EXPECT_DOUBLE_EQ(4.75, gauge);
+
+    metrics->Overwrite(other);
+
+    ASSERT_OK_AND_ASSIGN(gauge, metrics->GetGauge("g2"));
+    EXPECT_DOUBLE_EQ(3.25, gauge);
+    ASSERT_OK_AND_ASSIGN(gauge, metrics->GetGauge("g3"));
+    EXPECT_DOUBLE_EQ(4.75, gauge);
+    ASSERT_NOK_WITH_MSG(metrics->GetGauge("g1"), "Key error: metric 'g1' not 
found");
+}
+
+TEST(MetricsImplTest, TestToString) {
+    std::shared_ptr<MetricsImpl> metrics1 = std::make_shared<MetricsImpl>();
+    metrics1->SetCounter("k1", 1);
+    metrics1->SetCounter("k2", 2);
+    metrics1->SetGauge("g1", 1.25);
+    std::shared_ptr<MetricsImpl> metrics2 = std::make_shared<MetricsImpl>();
+    metrics2->SetCounter("m1", 3);
+    metrics2->SetCounter("m2", 4);
+    metrics2->SetCounter("k2", 5);
+    metrics2->SetGauge("g2", 2.5);
+    metrics1->Merge(metrics2);
+    EXPECT_EQ(metrics1->ToString(), 
"{\"k1\":1,\"k2\":7,\"m1\":3,\"m2\":4,\"g1\":1.25,\"g2\":2.5}");
+}
+
+}  // namespace paimon::test

Reply via email to