This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new dff2cbf  feat: add StringUtils, DateTimeUtils, PathUtil, OptionsUtils, 
RapidJsonUtil, and Jsonizable utilities (#23)
dff2cbf is described below

commit dff2cbf2eed34f442f5ef6b90f4aaf43fc2bfebd
Author: dalingmeng <[email protected]>
AuthorDate: Fri May 29 09:48:13 2026 +0800

    feat: add StringUtils, DateTimeUtils, PathUtil, OptionsUtils, 
RapidJsonUtil, and Jsonizable utilities (#23)
    
    Squash merge PR #23.
---
 src/paimon/common/utils/date_time_utils.h        | 228 +++++++++++
 src/paimon/common/utils/date_time_utils_test.cpp | 337 ++++++++++++++++
 src/paimon/common/utils/jsonizable.h             |  61 +++
 src/paimon/common/utils/jsonizable_test.cpp      | 194 +++++++++
 src/paimon/common/utils/options_utils.h          | 105 +++++
 src/paimon/common/utils/options_utils_test.cpp   |  72 ++++
 src/paimon/common/utils/path_util.cpp            | 156 +++++++
 src/paimon/common/utils/path_util.h              |  58 +++
 src/paimon/common/utils/path_util_test.cpp       | 152 +++++++
 src/paimon/common/utils/rapidjson_util.h         | 441 ++++++++++++++++++++
 src/paimon/common/utils/rapidjson_util_test.cpp  | 147 +++++++
 src/paimon/common/utils/string_utils.cpp         | 224 +++++++++++
 src/paimon/common/utils/string_utils.h           | 209 ++++++++++
 src/paimon/common/utils/string_utils_test.cpp    | 491 +++++++++++++++++++++++
 src/paimon/testing/utils/timezone_guard.h        |  54 +++
 15 files changed, 2929 insertions(+)

diff --git a/src/paimon/common/utils/date_time_utils.h 
b/src/paimon/common/utils/date_time_utils.h
new file mode 100644
index 0000000..652c312
--- /dev/null
+++ b/src/paimon/common/utils/date_time_utils.h
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <sys/time.h>
+
+#include <cassert>
+#include <cstdint>
+#include <ctime>
+#include <limits>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "arrow/api.h"
+#include "arrow/compute/api.h"
+#include "arrow/vendored/datetime.h"
+#include "fmt/format.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/result.h"
+namespace paimon {
+/// Utils for date time.
+class DateTimeUtils {
+ public:
+    DateTimeUtils() = delete;
+    ~DateTimeUtils() = delete;
+
+    /// The number of milliseconds in a day.
+    ///
+    /// This is the modulo 'mask' used when converting TIMESTAMP values to 
DATE and TIME values.
+    static constexpr int64_t MILLIS_PER_DAY = 86400000l;  // = 24 * 60 * 60 * 
1000
+    static constexpr int64_t SECONDS_PER_DAY = 86400l;    // = 24 * 60 * 60
+    static constexpr int64_t NANOS_PER_MILLIS = 1000000l;
+    enum TimeType {
+        SECOND = 0,
+        MILLISECOND = 1,
+        MICROSECOND = 2,
+        NANOSECOND = 3,
+    };
+    constexpr static int64_t CONVERSION_FACTORS[] = {1L, 1000L, 1000000L, 
1000000000L};
+
+    // convert a timestamp of a certain type into a combination of two 
specified types
+    // e.g., src_timestamp = 12345678, src_type = ns, dst_first_type = ms, 
dst_second_type = ns
+    // return: {12, 345678}
+    static std::pair<int64_t, int64_t> TimestampConverter(int64_t 
src_timestamp,
+                                                          const TimeType& 
src_type,
+                                                          const TimeType& 
dst_first_type,
+                                                          const TimeType& 
dst_second_type) {
+        if (src_type <= dst_first_type) {
+            // e.g., ms -> {us, ns} or {ms, ns} or {us, us} or {ns, ms}
+            int64_t conversion_factor_to_first_type =
+                CONVERSION_FACTORS[dst_first_type] / 
CONVERSION_FACTORS[src_type];
+            // TODO(jinli.zjw): maybe overflow int64
+            assert(src_timestamp * conversion_factor_to_first_type <
+                   std::numeric_limits<int64_t>::max());
+            return std::make_pair(src_timestamp * 
conversion_factor_to_first_type, 0L);
+        } else {
+            // e.g., ns -> {ms, ns} or {ms, s} or {ms, us}
+            int64_t conversion_factor_to_first_type =
+                CONVERSION_FACTORS[src_type] / 
CONVERSION_FACTORS[dst_first_type];
+            double conversion_factor_to_second_type =
+                static_cast<double>(CONVERSION_FACTORS[dst_second_type]) /
+                CONVERSION_FACTORS[src_type];
+
+            int64_t first_value = src_timestamp / 
conversion_factor_to_first_type;
+            int64_t second_value = src_timestamp % 
conversion_factor_to_first_type;
+            if (second_value < 0) {
+                second_value += conversion_factor_to_first_type;
+                first_value--;
+            }
+            second_value = conversion_factor_to_second_type * second_value;
+            return std::make_pair(first_value, second_value);
+        }
+    }
+
+    static int64_t TimestampToInteger(const Timestamp& timestamp, const 
TimeType& dst_type) {
+        if (dst_type == TimeType::SECOND) {
+            return timestamp.GetMillisecond() / 
CONVERSION_FACTORS[MILLISECOND];
+        } else if (dst_type == TimeType::MILLISECOND) {
+            return timestamp.GetMillisecond();
+        } else if (dst_type == TimeType::MICROSECOND) {
+            return timestamp.ToMicrosecond();
+        }
+        return timestamp.ToNanosecond();
+    }
+
+    static inline uint64_t GetCurrentUTCTimeUs() {
+        struct timeval ts;
+        gettimeofday(&ts, nullptr);
+        return static_cast<uint64_t>(ts.tv_sec) * 1000000ULL + 
static_cast<uint64_t>(ts.tv_usec);
+    }
+
+    static inline Result<Timestamp> ToLocalTimestamp(const Timestamp& 
utc_timestamp) {
+        int64_t utc_micro = utc_timestamp.ToMicrosecond();
+        auto utc_ts_scalar = std::make_shared<arrow::TimestampScalar>(
+            utc_micro, arrow::TimeUnit::MICRO, GetLocalTimezoneName());
+        PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
+            arrow::Datum local_micro, 
arrow::compute::LocalTimestamp(arrow::Datum(utc_ts_scalar)));
+        auto local_ts_scalar =
+            
std::dynamic_pointer_cast<arrow::TimestampScalar>(local_micro.scalar());
+        auto [millisecond, nano_of_millisecond] = 
DateTimeUtils::TimestampConverter(
+            *(static_cast<const int64_t*>(local_ts_scalar->data())),
+            DateTimeUtils::TimeType::MICROSECOND, 
DateTimeUtils::TimeType::MILLISECOND,
+            DateTimeUtils::TimeType::NANOSECOND);
+        return Timestamp(millisecond, nano_of_millisecond);
+    }
+
+    static inline Result<uint64_t> GetCurrentLocalTimeUs() {
+        auto [millisecond, nano_of_millisecond] = 
DateTimeUtils::TimestampConverter(
+            GetCurrentUTCTimeUs(), DateTimeUtils::TimeType::MICROSECOND,
+            DateTimeUtils::TimeType::MILLISECOND, 
DateTimeUtils::TimeType::NANOSECOND);
+        Timestamp utc_timestamp(millisecond, nano_of_millisecond);
+        PAIMON_ASSIGN_OR_RAISE(Timestamp local_timestamp, 
ToLocalTimestamp(utc_timestamp));
+        return local_timestamp.ToMicrosecond();
+    }
+
+    static inline Result<int32_t> GetCurrentLocalHour() {
+        PAIMON_ASSIGN_OR_RAISE(uint64_t local_us, GetCurrentLocalTimeUs());
+        auto local_seconds = static_cast<time_t>(local_us / 1000000);
+        std::tm local_tm{};
+        gmtime_r(&local_seconds, &local_tm);
+        return local_tm.tm_hour;
+    }
+
+    static inline int32_t GetPrecisionFromType(
+        const std::shared_ptr<arrow::TimestampType>& timestamp_type) {
+        int32_t precision = Timestamp::MAX_PRECISION;
+        if (timestamp_type->unit() == arrow::TimeUnit::type::SECOND) {
+            precision = Timestamp::MIN_PRECISION;
+        } else if (timestamp_type->unit() == arrow::TimeUnit::type::MILLI) {
+            precision = Timestamp::MILLIS_PRECISION;
+        } else if (timestamp_type->unit() == arrow::TimeUnit::type::MICRO) {
+            precision = Timestamp::DEFAULT_PRECISION;
+        }
+        return precision;
+    }
+
+    static inline TimeType GetTimeTypeFromArrowType(
+        const std::shared_ptr<arrow::TimestampType>& timestamp_type) {
+        if (timestamp_type->unit() == arrow::TimeUnit::type::SECOND) {
+            return TimeType::SECOND;
+        } else if (timestamp_type->unit() == arrow::TimeUnit::type::MILLI) {
+            return TimeType::MILLISECOND;
+        } else if (timestamp_type->unit() == arrow::TimeUnit::type::MICRO) {
+            return TimeType::MICROSECOND;
+        }
+        return TimeType::NANOSECOND;
+    }
+
+    static inline Result<std::shared_ptr<arrow::DataType>> 
GetTypeFromPrecision(
+        int32_t precision, bool with_timezone) {
+        std::string timezone = with_timezone ? GetLocalTimezoneName() : "";
+        if (precision == Timestamp::MIN_PRECISION) {
+            return arrow::timestamp(arrow::TimeUnit::type::SECOND, timezone);
+        } else if (precision == Timestamp::MILLIS_PRECISION) {
+            return arrow::timestamp(arrow::TimeUnit::type::MILLI, timezone);
+        } else if (precision == Timestamp::DEFAULT_PRECISION) {
+            return arrow::timestamp(arrow::TimeUnit::type::MICRO, timezone);
+        } else if (precision == Timestamp::MAX_PRECISION) {
+            return arrow::timestamp(arrow::TimeUnit::type::NANO, timezone);
+        }
+        return Status::Invalid("only support precision 0/3/6/9 in timestamp 
type");
+    }
+
+    static std::string GetLocalTimezoneName() {
+        // find local tz in env
+        const char* timezone = std::getenv("TZ");
+        if (timezone != nullptr && *timezone != '\0') {
+            return std::string(timezone);
+        }
+        // find local tz in file
+        auto* tz = arrow_vendored::date::current_zone();
+        return tz ? tz->name() : "UTC";
+    }
+
+    static std::string GetArrowTimeUnitStr(arrow::TimeUnit::type unit) {
+        switch (unit) {
+            case arrow::TimeUnit::SECOND:
+                return "SECOND";
+            case arrow::TimeUnit::MILLI:
+                return "MILLISECOND";
+            case arrow::TimeUnit::MICRO:
+                return "MICROSECOND";
+            case arrow::TimeUnit::NANO:
+                return "NANOSECOND";
+            default:
+                break;
+        }
+        return "UNKNOWN";
+    }
+
+    // there may be a precision loss for nano
+    static Result<Timestamp> ToUTCTimestamp(const Timestamp& timestamp) {
+        int64_t micro_second = timestamp.ToMicrosecond();
+        auto local_ts_scalar =
+            std::make_shared<arrow::TimestampScalar>(micro_second, 
arrow::TimeUnit::MICRO);
+        arrow::compute::AssumeTimezoneOptions 
options(DateTimeUtils::GetLocalTimezoneName());
+        PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
+            arrow::Datum target_scalar,
+            arrow::compute::AssumeTimezone(arrow::Datum(local_ts_scalar), 
options));
+        auto utc_ts_scalar =
+            
std::dynamic_pointer_cast<arrow::TimestampScalar>(target_scalar.scalar());
+        auto [milli, nano] = DateTimeUtils::TimestampConverter(
+            *(static_cast<const int64_t*>(utc_ts_scalar->data())),
+            DateTimeUtils::TimeType::MICROSECOND, 
DateTimeUtils::TimeType::MILLISECOND,
+            DateTimeUtils::TimeType::NANOSECOND);
+        return Timestamp(milli, nano);
+    }
+};
+}  // namespace paimon
diff --git a/src/paimon/common/utils/date_time_utils_test.cpp 
b/src/paimon/common/utils/date_time_utils_test.cpp
new file mode 100644
index 0000000..e012172
--- /dev/null
+++ b/src/paimon/common/utils/date_time_utils_test.cpp
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/utils/date_time_utils.h"
+
+#include <memory>
+
+#include "gtest/gtest.h"
+#include "paimon/testing/utils/testharness.h"
+#include "paimon/testing/utils/timezone_guard.h"
+
+namespace paimon::test {
+
+TEST(DateTimeUtilsTest, TestTimestampConverter) {
+    {
+        auto ret = DateTimeUtils::TimestampConverter(
+            10L, DateTimeUtils::SECOND, DateTimeUtils::MILLISECOND, 
DateTimeUtils::NANOSECOND);
+        ASSERT_EQ(ret, std::make_pair(10000L, 0L));
+    }
+    {
+        auto ret = DateTimeUtils::TimestampConverter(
+            10L, DateTimeUtils::SECOND, DateTimeUtils::NANOSECOND, 
DateTimeUtils::NANOSECOND);
+        ASSERT_EQ(ret, std::make_pair(10000000000L, 0L));
+    }
+    {
+        auto ret = DateTimeUtils::TimestampConverter(
+            10L, DateTimeUtils::SECOND, DateTimeUtils::NANOSECOND, 
DateTimeUtils::SECOND);
+        ASSERT_EQ(ret, std::make_pair(10000000000L, 0L));
+    }
+
+    {
+        auto ret = DateTimeUtils::TimestampConverter(
+            2567L, DateTimeUtils::MILLISECOND, DateTimeUtils::SECOND, 
DateTimeUtils::NANOSECOND);
+        ASSERT_EQ(ret, std::make_pair(2L, 567000000L));
+    }
+    {
+        auto ret = DateTimeUtils::TimestampConverter(2567L, 
DateTimeUtils::MILLISECOND,
+                                                     DateTimeUtils::SECOND, 
DateTimeUtils::SECOND);
+        ASSERT_EQ(ret, std::make_pair(2L, 0L));
+    }
+    {
+        auto ret = DateTimeUtils::TimestampConverter(
+            2567L, DateTimeUtils::MILLISECOND, DateTimeUtils::MICROSECOND, 
DateTimeUtils::SECOND);
+        ASSERT_EQ(ret, std::make_pair(2567000L, 0L));
+    }
+    {
+        auto ret = DateTimeUtils::TimestampConverter(2567L, 
DateTimeUtils::MILLISECOND,
+                                                     
DateTimeUtils::MICROSECOND,
+                                                     
DateTimeUtils::NANOSECOND);
+        ASSERT_EQ(ret, std::make_pair(2567000L, 0L));
+    }
+
+    {
+        auto ret = DateTimeUtils::TimestampConverter(12345678L, 
DateTimeUtils::NANOSECOND,
+                                                     
DateTimeUtils::MILLISECOND,
+                                                     
DateTimeUtils::NANOSECOND);
+        ASSERT_EQ(ret, std::make_pair(12L, 345678L));
+    }
+    {
+        auto ret = DateTimeUtils::TimestampConverter(12345678L, 
DateTimeUtils::NANOSECOND,
+                                                     
DateTimeUtils::MILLISECOND,
+                                                     
DateTimeUtils::MICROSECOND);
+        ASSERT_EQ(ret, std::make_pair(12L, 345L));
+    }
+    {
+        auto ret =
+            DateTimeUtils::TimestampConverter(12345678L, 
DateTimeUtils::NANOSECOND,
+                                              DateTimeUtils::MILLISECOND, 
DateTimeUtils::SECOND);
+        ASSERT_EQ(ret, std::make_pair(12L, 0L));
+    }
+    {
+        auto ret = DateTimeUtils::TimestampConverter(
+            -2240521239998998999L, DateTimeUtils::NANOSECOND, 
DateTimeUtils::MILLISECOND,
+            DateTimeUtils::NANOSECOND);
+        ASSERT_EQ(ret, std::make_pair(-2240521239999L, 1001L));
+    }
+    {
+        // 9999-12-31
+        auto ret = DateTimeUtils::TimestampConverter(253402252995L, 
DateTimeUtils::SECOND,
+                                                     
DateTimeUtils::MILLISECOND,
+                                                     
DateTimeUtils::NANOSECOND);
+        ASSERT_EQ(ret, std::make_pair(253402252995000L, 0L));
+    }
+}
+
+TEST(DateTimeUtilsTest, TestTimestampToInteger) {
+    {
+        ASSERT_EQ(DateTimeUtils::TimestampToInteger(Timestamp(1758173447000l, 
0),
+                                                    
/*dst_type=*/DateTimeUtils::TimeType::SECOND),
+                  1758173447l);
+        ASSERT_EQ(
+            DateTimeUtils::TimestampToInteger(Timestamp(1758173447001l, 0),
+                                              
/*dst_type=*/DateTimeUtils::TimeType::MILLISECOND),
+            1758173447001l);
+        ASSERT_EQ(
+            DateTimeUtils::TimestampToInteger(Timestamp(1758173447001l, 1000),
+                                              
/*dst_type=*/DateTimeUtils::TimeType::MICROSECOND),
+            1758173447001001l);
+        ASSERT_EQ(
+            DateTimeUtils::TimestampToInteger(Timestamp(1758173447001l, 1001),
+                                              
/*dst_type=*/DateTimeUtils::TimeType::NANOSECOND),
+            1758173447001001001l);
+    }
+    {
+        ASSERT_EQ(DateTimeUtils::TimestampToInteger(Timestamp(-2493033748000l, 
0),
+                                                    
/*dst_type=*/DateTimeUtils::TimeType::SECOND),
+                  -2493033748l);
+        ASSERT_EQ(
+            DateTimeUtils::TimestampToInteger(Timestamp(-2493033748001l, 0),
+                                              
/*dst_type=*/DateTimeUtils::TimeType::MILLISECOND),
+            -2493033748001l);
+        ASSERT_EQ(
+            DateTimeUtils::TimestampToInteger(Timestamp(-2493033748001l, 1000),
+                                              
/*dst_type=*/DateTimeUtils::TimeType::MICROSECOND),
+            -2493033748000999l);
+        ASSERT_EQ(
+            DateTimeUtils::TimestampToInteger(Timestamp(-2493033748001l, 1001),
+                                              
/*dst_type=*/DateTimeUtils::TimeType::NANOSECOND),
+            -2493033748000998999l);
+    }
+
+    {
+        // 9999-12-31, cannot convert to nano second, which is overflow for 
int64
+        
ASSERT_EQ(DateTimeUtils::TimestampToInteger(Timestamp(253402252995000l, 0),
+                                                    
/*dst_type=*/DateTimeUtils::TimeType::SECOND),
+                  253402252995l);
+        ASSERT_EQ(
+            DateTimeUtils::TimestampToInteger(Timestamp(253402252995001l, 0),
+                                              
/*dst_type=*/DateTimeUtils::TimeType::MILLISECOND),
+            253402252995001l);
+        ASSERT_EQ(
+            DateTimeUtils::TimestampToInteger(Timestamp(253402252995001l, 
1000),
+                                              
/*dst_type=*/DateTimeUtils::TimeType::MICROSECOND),
+            253402252995001001l);
+    }
+    {
+        // 0000-01-01, cannot convert to nano second, which is overflow for 
int64
+        
ASSERT_EQ(DateTimeUtils::TimestampToInteger(Timestamp(-62167219200000l, 0),
+                                                    
/*dst_type=*/DateTimeUtils::TimeType::SECOND),
+                  -62167219200l);
+        ASSERT_EQ(
+            DateTimeUtils::TimestampToInteger(Timestamp(-62167219200000l, 0),
+                                              
/*dst_type=*/DateTimeUtils::TimeType::MILLISECOND),
+            -62167219200000l);
+        ASSERT_EQ(
+            DateTimeUtils::TimestampToInteger(Timestamp(-62167219200000l, 
1000),
+                                              
/*dst_type=*/DateTimeUtils::TimeType::MICROSECOND),
+            -62167219199999999l);
+    }
+
+    {
+        // test precision loss
+        ASSERT_EQ(
+            DateTimeUtils::TimestampToInteger(Timestamp(1758173447001l, 1001),
+                                              
/*dst_type=*/DateTimeUtils::TimeType::MICROSECOND),
+            1758173447001001l);
+        ASSERT_EQ(
+            DateTimeUtils::TimestampToInteger(Timestamp(1758173447001l, 1001),
+                                              
/*dst_type=*/DateTimeUtils::TimeType::MILLISECOND),
+            1758173447001l);
+        ASSERT_EQ(DateTimeUtils::TimestampToInteger(Timestamp(1758173447001l, 
1001),
+                                                    
/*dst_type=*/DateTimeUtils::TimeType::SECOND),
+                  1758173447l);
+    }
+}
+
+TEST(DateTimeUtilsTest, TestGetPrecisionFromType) {
+    auto ts_sec_type = arrow::timestamp(arrow::TimeUnit::type::SECOND);
+    auto ts_type = 
arrow::internal::checked_pointer_cast<arrow::TimestampType>(ts_sec_type);
+    ASSERT_EQ(DateTimeUtils::GetPrecisionFromType(ts_type), 0);
+
+    auto ts_milli_type = arrow::timestamp(arrow::TimeUnit::type::MILLI);
+    ts_type = 
arrow::internal::checked_pointer_cast<arrow::TimestampType>(ts_milli_type);
+    ASSERT_EQ(DateTimeUtils::GetPrecisionFromType(ts_type), 3);
+
+    auto ts_micro_type = arrow::timestamp(arrow::TimeUnit::type::MICRO);
+    ts_type = 
arrow::internal::checked_pointer_cast<arrow::TimestampType>(ts_micro_type);
+    ASSERT_EQ(DateTimeUtils::GetPrecisionFromType(ts_type), 6);
+
+    auto ts_nano_type = arrow::timestamp(arrow::TimeUnit::type::NANO);
+    ts_type = 
arrow::internal::checked_pointer_cast<arrow::TimestampType>(ts_nano_type);
+    ASSERT_EQ(DateTimeUtils::GetPrecisionFromType(ts_type), 9);
+}
+
+TEST(DateTimeUtilsTest, TestGetTimeTypeFromArrowType) {
+    auto ts_sec_type = arrow::timestamp(arrow::TimeUnit::type::SECOND);
+    auto ts_type = 
arrow::internal::checked_pointer_cast<arrow::TimestampType>(ts_sec_type);
+    ASSERT_EQ(DateTimeUtils::GetTimeTypeFromArrowType(ts_type), 
DateTimeUtils::TimeType::SECOND);
+
+    auto ts_milli_type = arrow::timestamp(arrow::TimeUnit::type::MILLI);
+    ts_type = 
arrow::internal::checked_pointer_cast<arrow::TimestampType>(ts_milli_type);
+    ASSERT_EQ(DateTimeUtils::GetTimeTypeFromArrowType(ts_type),
+              DateTimeUtils::TimeType::MILLISECOND);
+
+    auto ts_micro_type = arrow::timestamp(arrow::TimeUnit::type::MICRO);
+    ts_type = 
arrow::internal::checked_pointer_cast<arrow::TimestampType>(ts_micro_type);
+    ASSERT_EQ(DateTimeUtils::GetTimeTypeFromArrowType(ts_type),
+              DateTimeUtils::TimeType::MICROSECOND);
+
+    auto ts_nano_type = arrow::timestamp(arrow::TimeUnit::type::NANO);
+    ts_type = 
arrow::internal::checked_pointer_cast<arrow::TimestampType>(ts_nano_type);
+    ASSERT_EQ(DateTimeUtils::GetTimeTypeFromArrowType(ts_type),
+              DateTimeUtils::TimeType::NANOSECOND);
+}
+
+TEST(DateTimeUtilsTest, TestGetTypeFromPrecision) {
+    auto timezone = DateTimeUtils::GetLocalTimezoneName();
+    {
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::DataType> ts_type,
+                             DateTimeUtils::GetTypeFromPrecision(0, 
/*with_timezone=*/false));
+        
ASSERT_TRUE(ts_type->Equals(arrow::timestamp(arrow::TimeUnit::type::SECOND)));
+    }
+    {
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::DataType> ts_type,
+                             DateTimeUtils::GetTypeFromPrecision(0, 
/*with_timezone=*/true));
+        
ASSERT_TRUE(ts_type->Equals(arrow::timestamp(arrow::TimeUnit::type::SECOND, 
timezone)));
+    }
+    {
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::DataType> ts_type,
+                             DateTimeUtils::GetTypeFromPrecision(3, 
/*with_timezone=*/false));
+        
ASSERT_TRUE(ts_type->Equals(arrow::timestamp(arrow::TimeUnit::type::MILLI)));
+    }
+    {
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::DataType> ts_type,
+                             DateTimeUtils::GetTypeFromPrecision(3, 
/*with_timezone=*/true));
+        
ASSERT_TRUE(ts_type->Equals(arrow::timestamp(arrow::TimeUnit::type::MILLI, 
timezone)));
+    }
+    {
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::DataType> ts_type,
+                             DateTimeUtils::GetTypeFromPrecision(6, 
/*with_timezone=*/false));
+        
ASSERT_TRUE(ts_type->Equals(arrow::timestamp(arrow::TimeUnit::type::MICRO)));
+    }
+    {
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::DataType> ts_type,
+                             DateTimeUtils::GetTypeFromPrecision(6, 
/*with_timezone=*/true));
+        
ASSERT_TRUE(ts_type->Equals(arrow::timestamp(arrow::TimeUnit::type::MICRO, 
timezone)));
+    }
+    {
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::DataType> ts_type,
+                             DateTimeUtils::GetTypeFromPrecision(9, 
/*with_timezone=*/false));
+        
ASSERT_TRUE(ts_type->Equals(arrow::timestamp(arrow::TimeUnit::type::NANO)));
+    }
+    {
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::DataType> ts_type,
+                             DateTimeUtils::GetTypeFromPrecision(9, 
/*with_timezone=*/true));
+        
ASSERT_TRUE(ts_type->Equals(arrow::timestamp(arrow::TimeUnit::type::NANO, 
timezone)));
+    }
+    {
+        ASSERT_NOK_WITH_MSG(DateTimeUtils::GetTypeFromPrecision(4, 
/*with_timezone=*/true),
+                            "only support precision 0/3/6/9 in timestamp 
type");
+    }
+}
+
+TEST(DateTimeUtilsTest, TestGetLocalTimezoneName) {
+    std::string timezone = DateTimeUtils::GetLocalTimezoneName();
+    {
+        TimezoneGuard guard("US/Hawaii");
+        ASSERT_EQ(DateTimeUtils::GetLocalTimezoneName(), "US/Hawaii");
+    }
+    ASSERT_EQ(DateTimeUtils::GetLocalTimezoneName(), timezone);
+}
+
+TEST(DateTimeUtilsTest, TestGetCurrentLocalTimeUs) {
+    TimezoneGuard guard("Asia/Shanghai");
+    uint64_t utc_ts = DateTimeUtils::GetCurrentUTCTimeUs();
+    uint64_t local_ts = DateTimeUtils::GetCurrentLocalTimeUs().value();
+    ASSERT_GT(local_ts, utc_ts);
+    ASSERT_GE(local_ts - utc_ts, 28800000000l);
+}
+
+TEST(DateTimeUtilsTest, TestToLocalTimestamp) {
+    {
+        TimezoneGuard guard("Asia/Shanghai");
+        ASSERT_OK_AND_ASSIGN(Timestamp timestamp, 
DateTimeUtils::ToLocalTimestamp(
+                                                      
Timestamp::FromEpochMillis(1700000000123L)));
+        ASSERT_EQ(timestamp, Timestamp::FromEpochMillis(1700028800123L));
+    }
+    {
+        TimezoneGuard guard("UTC");
+        ASSERT_OK_AND_ASSIGN(Timestamp timestamp, 
DateTimeUtils::ToLocalTimestamp(
+                                                      
Timestamp::FromEpochMillis(1700000000123L)));
+        ASSERT_EQ(timestamp, Timestamp::FromEpochMillis(1700000000123L));
+    }
+}
+
+TEST(DateTimeUtilsTest, TestGetCurrentLocalHour) {
+    int32_t shanghai_hour = 0;
+    int32_t utc_hour = 0;
+    {
+        TimezoneGuard guard("Asia/Shanghai");
+        ASSERT_OK_AND_ASSIGN(shanghai_hour, 
DateTimeUtils::GetCurrentLocalHour());
+    }
+    {
+        TimezoneGuard guard("UTC");
+        ASSERT_OK_AND_ASSIGN(utc_hour, DateTimeUtils::GetCurrentLocalHour());
+    }
+    ASSERT_EQ((shanghai_hour - utc_hour + 24) % 24, 8);
+}
+
+TEST(DateTimeUtilsTest, TestToUTCTimestamp) {
+    TimezoneGuard guard("Asia/Shanghai");
+    {
+        Timestamp ts(0, 0);
+        ASSERT_OK_AND_ASSIGN(Timestamp utc_ts, 
DateTimeUtils::ToUTCTimestamp(ts));
+        ASSERT_EQ(utc_ts, Timestamp(-28800000l, 0));
+    }
+    {
+        // test precision loss for nano
+        Timestamp ts(0, 500);
+        ASSERT_OK_AND_ASSIGN(Timestamp utc_ts, 
DateTimeUtils::ToUTCTimestamp(ts));
+        ASSERT_EQ(utc_ts, Timestamp(-28800000l, 0));
+    }
+}
+TEST(DateTimeUtilsTest, TestGetArrowTimeUnitStr) {
+    ASSERT_EQ(DateTimeUtils::GetArrowTimeUnitStr(arrow::TimeUnit::SECOND), 
"SECOND");
+    ASSERT_EQ(DateTimeUtils::GetArrowTimeUnitStr(arrow::TimeUnit::MILLI), 
"MILLISECOND");
+    ASSERT_EQ(DateTimeUtils::GetArrowTimeUnitStr(arrow::TimeUnit::MICRO), 
"MICROSECOND");
+    ASSERT_EQ(DateTimeUtils::GetArrowTimeUnitStr(arrow::TimeUnit::NANO), 
"NANOSECOND");
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/utils/jsonizable.h 
b/src/paimon/common/utils/jsonizable.h
new file mode 100644
index 0000000..9e2a395
--- /dev/null
+++ b/src/paimon/common/utils/jsonizable.h
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <utility>
+
+#include "paimon/common/utils/rapidjson_util.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "rapidjson/allocators.h"
+#include "rapidjson/document.h"
+#include "rapidjson/rapidjson.h"
+
+namespace paimon {
+
+#define JSONIZABLE_FRIEND_AND_DEFAULT_CTOR(Type) \
+    friend class RapidJsonUtil;                  \
+    friend class Jsonizable;                     \
+    Type() = default;
+
+template <typename Derived>
+class Jsonizable {
+ public:
+    Jsonizable() = default;
+    virtual ~Jsonizable() = default;
+
+    virtual rapidjson::Value ToJson(rapidjson::Document::AllocatorType* 
allocator) const
+        noexcept(false) = 0;
+    virtual void FromJson(const rapidjson::Value& obj) noexcept(false) = 0;
+
+    Result<std::string> ToJsonString() const {
+        std::string json_str;
+        PAIMON_RETURN_NOT_OK(RapidJsonUtil::ToJsonString(*this, &json_str));
+        return json_str;
+    }
+    static Result<Derived> FromJsonString(const std::string& json_str) {
+        Derived obj;
+        PAIMON_RETURN_NOT_OK(RapidJsonUtil::FromJsonString(json_str, &obj));
+        return obj;
+    }
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/utils/jsonizable_test.cpp 
b/src/paimon/common/utils/jsonizable_test.cpp
new file mode 100644
index 0000000..b5a5158
--- /dev/null
+++ b/src/paimon/common/utils/jsonizable_test.cpp
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/utils/jsonizable.h"
+
+#include <algorithm>
+#include <map>
+#include <string>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/testing/utils/testharness.h"
+#include "rapidjson/allocators.h"
+#include "rapidjson/document.h"
+#include "rapidjson/rapidjson.h"
+
+namespace paimon::test {
+
+TEST(JsonizableTest, TestNestedClass) {
+    class ClassA : public Jsonizable<ClassA> {
+     public:
+        bool operator==(const ClassA& other) const {
+            return vec_ == other.vec_ && string_ == other.string_ && map_ == 
other.map_;
+        }
+        rapidjson::Value ToJson(rapidjson::Document::AllocatorType* allocator) 
const
+            noexcept(false) override {
+            rapidjson::Value value(rapidjson::kObjectType);
+            value.AddMember("vec", RapidJsonUtil::SerializeValue(vec_, 
allocator).Move(),
+                            *allocator);
+            value.AddMember("string", RapidJsonUtil::SerializeValue(string_, 
allocator).Move(),
+                            *allocator);
+            value.AddMember("map_a", RapidJsonUtil::SerializeValue(map_, 
allocator).Move(),
+                            *allocator);
+            return value;
+        }
+        void FromJson(const rapidjson::Value& value) noexcept(false) override {
+            vec_ = 
RapidJsonUtil::DeserializeKeyValue<std::vector<double>>(value, "vec", vec_);
+            string_ = RapidJsonUtil::DeserializeKeyValue<std::string>(value, 
"string", string_);
+            map_ = RapidJsonUtil::DeserializeKeyValue<std::map<std::string, 
std::string>>(
+                value, "map_a", map_);
+        }
+
+     private:
+        JSONIZABLE_FRIEND_AND_DEFAULT_CTOR(ClassA);
+
+        std::vector<double> vec_;
+        std::string string_;
+        std::map<std::string, std::string> map_;
+    };
+
+    class ClassB : public Jsonizable<ClassB> {
+     public:
+        bool operator==(const ClassB& other) const {
+            return a_ == other.a_ && a_vec_ == other.a_vec_ && f_ == other.f_ 
&& map_ == other.map_;
+        }
+        rapidjson::Value ToJson(rapidjson::Document::AllocatorType* allocator) 
const
+            noexcept(false) override {
+            rapidjson::Value obj(rapidjson::kObjectType);
+            obj.AddMember("ClassA", RapidJsonUtil::SerializeValue(a_, 
allocator).Move(),
+                          *allocator);
+            obj.AddMember("ClassA_vec", RapidJsonUtil::SerializeValue(a_vec_, 
allocator).Move(),
+                          *allocator);
+            obj.AddMember("float", RapidJsonUtil::SerializeValue(f_, 
allocator).Move(), *allocator);
+            obj.AddMember("map_b", RapidJsonUtil::SerializeValue(map_, 
allocator).Move(),
+                          *allocator);
+            return obj;
+        }
+        void FromJson(const rapidjson::Value& obj) noexcept(false) override {
+            a_ = RapidJsonUtil::DeserializeKeyValue<ClassA>(obj, "ClassA", a_);
+            a_vec_ =
+                RapidJsonUtil::DeserializeKeyValue<std::vector<ClassA>>(obj, 
"ClassA_vec", a_vec_);
+            f_ = RapidJsonUtil::DeserializeKeyValue<float>(obj, "float", f_);
+            map_ = RapidJsonUtil::DeserializeKeyValue<std::map<std::string, 
std::vector<int>>>(
+                obj, "map_b", map_);
+        }
+
+     private:
+        JSONIZABLE_FRIEND_AND_DEFAULT_CTOR(ClassB);
+
+        ClassA a_;
+        std::vector<ClassA> a_vec_;
+        float f_;
+        std::map<std::string, std::vector<int>> map_;
+    };
+
+    ClassA obj_a1, obj_a2;
+    obj_a1.vec_ = {11.0, 12.0, 13.0, 14.0};
+    obj_a1.string_ = "string_value_1";
+    obj_a1.map_ = {{"10", "a1"}, {"11", "b1"}, {"12", "c1"}};
+
+    obj_a2.vec_ = {21.0, 22.0, 23.0, 24.0};
+    obj_a2.string_ = "string_value_2";
+    obj_a2.map_ = {{"20", "a2"}, {"21", "b2"}, {"22", "c2"}};
+
+    ClassB obj_b;
+    obj_b.a_.vec_ = {1.0, 2.0, 3.0, 4.0};
+    obj_b.a_.string_ = "string_value";
+    obj_b.a_.map_ = {{"0", "a"}, {"1", "b"}, {"2", "c"}};
+
+    obj_b.a_vec_.push_back(obj_a1);
+    obj_b.a_vec_.push_back(obj_a2);
+    obj_b.f_ = 10.5;
+    obj_b.map_ = {{"aa", {0, 1}}, {"bb", {1, 2}}, {"cc", {2, 3}}};
+
+    ASSERT_OK_AND_ASSIGN(std::string json_str, obj_b.ToJsonString());
+    ASSERT_OK_AND_ASSIGN(ClassB obj_b_2, ClassB::FromJsonString(json_str));
+    ASSERT_EQ(obj_b, obj_b_2);
+
+    // test invalid json_str
+    auto invalid_json_str = json_str.substr(0, json_str.length() / 2);
+    ASSERT_NOK_WITH_MSG(ClassB::FromJsonString(invalid_json_str), "deserialize 
failed");
+}
+
+TEST(JsonizableTest, TestUpgradeClass) {
+    class ClassA : public Jsonizable<ClassA> {
+     public:
+        bool operator==(const ClassA& other) const {
+            return vec_ == other.vec_ && string_ == other.string_;
+        }
+
+        rapidjson::Value ToJson(rapidjson::Document::AllocatorType* allocator) 
const
+            noexcept(false) override {
+            rapidjson::Value value(rapidjson::kObjectType);
+            value.AddMember("vec", RapidJsonUtil::SerializeValue(vec_, 
allocator).Move(),
+                            *allocator);
+            value.AddMember("string", RapidJsonUtil::SerializeValue(string_, 
allocator).Move(),
+                            *allocator);
+            return value;
+        }
+        void FromJson(const rapidjson::Value& value) noexcept(false) override {
+            vec_ = 
RapidJsonUtil::DeserializeKeyValue<std::vector<double>>(value, "vec", vec_);
+            string_ = RapidJsonUtil::DeserializeKeyValue<std::string>(value, 
"string", string_);
+        }
+
+     private:
+        JSONIZABLE_FRIEND_AND_DEFAULT_CTOR(ClassA);
+
+        std::vector<double> vec_;
+        std::string string_;
+    };
+
+    // modify vec_ from vector<double> to vector<string>
+    class NewClassA : public Jsonizable<NewClassA> {
+     public:
+        rapidjson::Value ToJson(rapidjson::Document::AllocatorType* allocator) 
const
+            noexcept(false) override {
+            rapidjson::Value value(rapidjson::kObjectType);
+            value.AddMember("vec", RapidJsonUtil::SerializeValue(vec_, 
allocator).Move(),
+                            *allocator);
+            value.AddMember("string", RapidJsonUtil::SerializeValue(string_, 
allocator).Move(),
+                            *allocator);
+            return value;
+        }
+        void FromJson(const rapidjson::Value& value) noexcept(false) override {
+            vec_ = 
RapidJsonUtil::DeserializeKeyValue<std::vector<std::string>>(value, "vec", 
vec_);
+            string_ = RapidJsonUtil::DeserializeKeyValue<std::string>(value, 
"string", string_);
+        }
+
+     private:
+        JSONIZABLE_FRIEND_AND_DEFAULT_CTOR(NewClassA);
+
+        std::vector<std::string> vec_;
+        std::string string_;
+    };
+
+    ClassA obj_a;
+    obj_a.vec_ = {1, 2, 3};
+    obj_a.string_ = "abcd";
+
+    ASSERT_OK_AND_ASSIGN(std::string json_str, obj_a.ToJsonString());
+    ASSERT_OK_AND_ASSIGN(ClassA obj_a_2, ClassA::FromJsonString(json_str));
+    ASSERT_EQ(obj_a, obj_a_2);
+
+    // test serialize with ClassA and deserialize with NewClassA
+    ASSERT_NOK_WITH_MSG(NewClassA::FromJsonString(json_str), "value must be 
string");
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/utils/options_utils.h 
b/src/paimon/common/utils/options_utils.h
new file mode 100644
index 0000000..90b30b5
--- /dev/null
+++ b/src/paimon/common/utils/options_utils.h
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cxxabi.h>
+
+#include <cassert>
+#include <cstdint>
+#include <cstdlib>
+#include <map>
+#include <optional>
+#include <string>
+#include <type_traits>
+#include <typeinfo>
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+class OptionsUtils {
+ public:
+    template <typename T>
+    using is_supported_type =
+        std::disjunction<std::is_trivially_copyable<T>, std::is_same<T, 
std::string>>;
+
+    OptionsUtils() = delete;
+    ~OptionsUtils() = delete;
+
+    template <typename T>
+    static Result<T> GetValueFromMap(const std::map<std::string, std::string>& 
key_value_map,
+                                     const std::string& key, const T& 
default_value) {
+        auto value = GetValueFromMap<T>(key_value_map, key);
+        if (value.ok()) {
+            return value.value();
+        } else if (value.status().IsNotExist()) {
+            return default_value;
+        }
+        return value.status();
+    }
+
+    template <typename T>
+    static Result<T> GetValueFromMap(const std::map<std::string, std::string>& 
key_value_map,
+                                     const std::string& key) {
+        static_assert(is_supported_type<T>::value, "T must be trivially 
copyable or string");
+        auto iter = key_value_map.find(key);
+        if (iter == key_value_map.end()) {
+            return Status::NotExist(fmt::format("key {} does not exist in 
map", key));
+        }
+        const auto& value_str = iter->second;
+        std::optional<T> value = StringUtils::StringToValue<T>(value_str);
+        if (value == std::nullopt) {
+            return Status::Invalid(fmt::format("convert key {}, value {} to {} 
failed", key,
+                                               value_str, GetTypeName<T>()));
+        }
+        return value.value();
+    }
+
+    /// Fetch options with specific prefix and remove prefix for key.
+    static std::map<std::string, std::string> FetchOptionsWithPrefix(
+        const std::string& prefix, const std::map<std::string, std::string>& 
options) {
+        std::map<std::string, std::string> options_with_prefix;
+        int64_t prefix_len = prefix.size();
+        for (const auto& [key, value] : options) {
+            if (StringUtils::StartsWith(key, prefix)) {
+                options_with_prefix[key.substr(prefix_len)] = value;
+            }
+        }
+        return options_with_prefix;
+    }
+
+    template <typename T>
+    static std::string GetTypeName() {
+        int32_t status;
+        char* demangled = abi::__cxa_demangle(typeid(T).name(), nullptr, 
nullptr, &status);
+        if (status == 0) {
+            std::string result(demangled);
+            free(demangled);
+            return result;
+        }
+        assert(demangled == nullptr);
+        return typeid(T).name();
+    }
+};
+}  // namespace paimon
diff --git a/src/paimon/common/utils/options_utils_test.cpp 
b/src/paimon/common/utils/options_utils_test.cpp
new file mode 100644
index 0000000..7a09a98
--- /dev/null
+++ b/src/paimon/common/utils/options_utils_test.cpp
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/utils/options_utils.h"
+
+#include "gtest/gtest.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(OptionsUtilsTest, TestGetValueFromMap) {
+    std::map<std::string, std::string> key_value_map;
+    key_value_map["key_int"] = "10";
+    key_value_map["key_bool"] = "true";
+    key_value_map["key_int16"] = "100";
+    key_value_map["key_double"] = "4.5E10";
+    // invalid
+    key_value_map["key_bool2"] = "true1";
+    key_value_map["key_int8"] = "500";
+    key_value_map["key_int64"] = "ab";
+
+    ASSERT_OK_AND_ASSIGN(auto int32_value,
+                         OptionsUtils::GetValueFromMap<int32_t>(key_value_map, 
"key_int"));
+    ASSERT_EQ(10, int32_value);
+    ASSERT_OK_AND_ASSIGN(auto bool_value,
+                         OptionsUtils::GetValueFromMap<bool>(key_value_map, 
"key_bool"));
+    ASSERT_TRUE(bool_value);
+    ASSERT_OK_AND_ASSIGN(auto int16_value,
+                         OptionsUtils::GetValueFromMap<int16_t>(key_value_map, 
"key_int16"));
+    ASSERT_EQ(100, int16_value);
+    ASSERT_OK_AND_ASSIGN(auto double_value,
+                         OptionsUtils::GetValueFromMap<double>(key_value_map, 
"key_double"));
+    ASSERT_NEAR(4.5E10, double_value, 0.00001);
+    ASSERT_NOK_WITH_MSG(OptionsUtils::GetValueFromMap<bool>(key_value_map, 
"key_bool2"),
+                        "convert key key_bool2, value true1 to bool failed");
+    ASSERT_NOK_WITH_MSG(OptionsUtils::GetValueFromMap<int8_t>(key_value_map, 
"key_int8"),
+                        "convert key key_int8, value 500 to signed char 
failed");
+    ASSERT_NOK_WITH_MSG(OptionsUtils::GetValueFromMap<int64_t>(key_value_map, 
"key_int64"),
+                        "convert key key_int64, value ab to long failed");
+    ASSERT_NOK_WITH_MSG(OptionsUtils::GetValueFromMap<int64_t>(key_value_map, 
"key_int64", 10),
+                        "convert key key_int64, value ab to long failed");
+
+    ASSERT_OK_AND_ASSIGN(
+        auto nonexist, OptionsUtils::GetValueFromMap<int32_t>(key_value_map, 
"key_nonexist", 233));
+    ASSERT_EQ(233, nonexist);
+    ASSERT_OK_AND_ASSIGN(auto empty,
+                         OptionsUtils::GetValueFromMap<int32_t>(key_value_map, 
"", 999));
+    ASSERT_EQ(999, empty);
+}
+
+TEST(OptionsUtilsTest, TestFetchOptionsWithPrefix) {
+    std::map<std::string, std::string> options = {{"key1", "value1"}, 
{"test.key2", "value2"}};
+    auto new_options = OptionsUtils::FetchOptionsWithPrefix("test.", options);
+    std::map<std::string, std::string> expected = {{"key2", "value2"}};
+    ASSERT_EQ(expected, new_options);
+}
+}  // namespace paimon::test
diff --git a/src/paimon/common/utils/path_util.cpp 
b/src/paimon/common/utils/path_util.cpp
new file mode 100644
index 0000000..a977ab8
--- /dev/null
+++ b/src/paimon/common/utils/path_util.cpp
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/utils/path_util.h"
+
+#include <cstddef>
+#include <cstdint>
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/common/utils/uuid.h"
+#include "paimon/status.h"
+
+namespace paimon {
+std::string Path::ToString() const {
+    std::string ret;
+    if (!scheme.empty()) {
+        ret = scheme + ":";
+    }
+    if (!authority.empty()) {
+        ret += "//";
+        ret += authority;
+    }
+    if (!path.empty()) {
+        ret += path;
+    }
+    return ret;
+}
+
+std::string PathUtil::JoinPath(const std::string& path, const std::string& 
name) noexcept {
+    if (path.empty()) {
+        return name;
+    }
+    if (name.empty()) {
+        return path;
+    }
+    int32_t slash_cnt = (*(path.rbegin()) == '/') + (*(name.begin()) == '/');
+    if (!slash_cnt) {
+        return path + "/" + name;
+    } else if (slash_cnt == 2) {
+        return path + name.substr(1);
+    }
+    return path + name;
+}
+
+std::string PathUtil::NormalizeInnerPath(const std::string& path) noexcept {
+    if (path.empty()) {
+        return path;
+    }
+    std::string ret;
+    ret.reserve(path.size());
+    char last_char = path[0];
+    ret.append(1, last_char);
+    for (size_t i = 1; i < path.size(); ++i) {
+        if (last_char == '/' && path[i] == '/') {
+            continue;
+        }
+        last_char = path[i];
+        ret.append(1, last_char);
+    }
+    TrimLastDelim(&ret);
+    return ret;
+}
+
+Result<std::string> PathUtil::NormalizePath(const std::string& path_str) 
noexcept {
+    PAIMON_ASSIGN_OR_RAISE(Path path, ToPath(path_str));
+    return path.ToString();
+}
+
+Result<Path> PathUtil::ToPath(const std::string& path) noexcept {
+    // TODO(yonghao.fyh): support Windows Driver
+    if (path.empty()) {
+        return Status::Invalid("path is an empty string.");
+    }
+    std::string scheme;
+    std::string authority;
+    int32_t start = 0;
+
+    // parse scheme
+    auto colon = path.find(':');
+    auto slash = path.find('/');
+    if ((colon != std::string::npos) && (slash == std::string::npos || colon < 
slash)) {
+        // has a scheme
+        scheme.append(path, 0, colon);
+        start = colon + 1;
+    }
+
+    // parse authority
+    if (StringUtils::StartsWith(path, "//", start) && (path.length() - start > 
2)) {
+        // has authority
+        int32_t next_slash = path.find('/', start + 2);
+        int32_t auth_end = next_slash > 0 ? next_slash : path.length();
+        authority = path.substr(start + 2, auth_end - start - 2);
+        start = auth_end;
+    }
+
+    // parse path in uri
+    std::string inner_path = NormalizeInnerPath(path.substr(start));
+    return Path(scheme, authority, inner_path);
+}
+
+std::string PathUtil::GetParentDirPath(const std::string& path) noexcept {
+    std::string::const_reverse_iterator it;
+    for (it = path.rbegin(); it != path.rend() && *it == '/'; it++) {
+    }
+    for (; it != path.rend() && *it != '/'; it++) {
+    }
+    for (; it != path.rend() && *it == '/'; it++) {
+    }
+    return path.substr(0, path.rend() - it);
+}
+
+std::string PathUtil::GetName(const std::string& path) noexcept {
+    std::string dir_path = path;
+    TrimLastDelim(&dir_path);
+    std::string::const_reverse_iterator it;
+    for (it = dir_path.rbegin(); it != dir_path.rend() && *it != '/'; it++) {
+    }
+    return dir_path.substr(dir_path.rend() - it);
+}
+
+void PathUtil::TrimLastDelim(std::string* dir_path) noexcept {
+    if (dir_path == nullptr || dir_path->empty()) {
+        return;
+    }
+    if (dir_path->length() > 1 && *(dir_path->rbegin()) == '/') {
+        dir_path->erase(dir_path->size() - 1, 1);
+    }
+}
+
+Result<std::string> PathUtil::CreateTempPath(const std::string& path) noexcept 
{
+    std::string uuid;
+    if (!UUID::Generate(&uuid)) {
+        return Status::Invalid("generate uuid failed");
+    }
+    return JoinPath(GetParentDirPath(path), fmt::format(".{}.{}.tmp", 
GetName(path), uuid));
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/utils/path_util.h 
b/src/paimon/common/utils/path_util.h
new file mode 100644
index 0000000..b893e76
--- /dev/null
+++ b/src/paimon/common/utils/path_util.h
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <string>
+
+#include "paimon/result.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+struct PAIMON_EXPORT Path {
+    Path(const std::string& _scheme, const std::string& _authority, const 
std::string& _path)
+        : scheme(_scheme), authority(_authority), path(_path) {}
+
+    std::string ToString() const;
+
+    std::string scheme;
+    std::string authority;
+    std::string path;
+};
+
+class PAIMON_EXPORT PathUtil {
+ public:
+    PathUtil() = delete;
+    ~PathUtil() = delete;
+
+    static std::string JoinPath(const std::string& path, const std::string& 
name) noexcept;
+    // TODO(jinli.zjw): should pass `Path.path` and normalize; otherwise if 
path is
+    // "oss://bucket1/", GetParentDirPath will return "oss:"
+    static std::string GetParentDirPath(const std::string& path) noexcept;
+    static std::string GetName(const std::string& path) noexcept;
+    static void TrimLastDelim(std::string* dir_path) noexcept;
+    static Result<std::string> CreateTempPath(const std::string& path) 
noexcept;
+    static Result<Path> ToPath(const std::string& path) noexcept;
+    static Result<std::string> NormalizePath(const std::string& path) noexcept;
+
+ private:
+    static std::string NormalizeInnerPath(const std::string& path) noexcept;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/utils/path_util_test.cpp 
b/src/paimon/common/utils/path_util_test.cpp
new file mode 100644
index 0000000..8e7ed4c
--- /dev/null
+++ b/src/paimon/common/utils/path_util_test.cpp
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/utils/path_util.h"
+
+#include "gtest/gtest.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(PathUtilsTest, TestJoinPath) {
+    ASSERT_EQ("/tmp/test_path/test", PathUtil::JoinPath("/tmp/test_path/test", 
""));
+}
+
+TEST(PathUtilsTest, TestGetParentDirPath) {
+    ASSERT_EQ("/tmp/test_path", 
PathUtil::GetParentDirPath("/tmp/test_path/test"));
+    ASSERT_EQ("/tmp/test_path", 
PathUtil::GetParentDirPath("/tmp/test_path/test/"));
+}
+
+TEST(PathUtilsTest, TestNormalizePathWithEmptyString) {
+    std::string test_path = "";
+    ASSERT_NOK_WITH_MSG(PathUtil::NormalizePath(test_path), "path is an empty 
string.");
+}
+
+TEST(PathUtilsTest, TestNormalizePathWithNoScheme) {
+    std::string test_path = "//tmp////index";
+    ASSERT_OK_AND_ASSIGN(std::string normalize_path, 
PathUtil::NormalizePath(test_path));
+    std::string expected_path = "//tmp/index";
+    ASSERT_EQ(normalize_path, expected_path);
+}
+
+TEST(PathUtilsTest, TestNormalizePath) {
+    {
+        // test with no authority
+        std::string test_path = "hdfs:///tmp/test_path/test_subdir";
+        ASSERT_OK_AND_ASSIGN(std::string normalize_path, 
PathUtil::NormalizePath(test_path));
+        std::string expected_path = "hdfs:/tmp/test_path/test_subdir";
+        ASSERT_EQ(normalize_path, expected_path);
+    }
+    {
+        // test with authority
+        std::string test_path = "hdfs://tmp/test_path//test_subdir/";
+        ASSERT_OK_AND_ASSIGN(std::string normalize_path, 
PathUtil::NormalizePath(test_path));
+        std::string expected_path = "hdfs://tmp/test_path/test_subdir";
+        ASSERT_EQ(normalize_path, expected_path);
+    }
+    {
+        // test with no authority
+        std::string test_path = "hdfs:///tmp/test_path//test_subdir/";
+        ASSERT_OK_AND_ASSIGN(std::string normalize_path, 
PathUtil::NormalizePath(test_path));
+        std::string expected_path = "hdfs:/tmp/test_path/test_subdir";
+        ASSERT_EQ(normalize_path, expected_path);
+    }
+}
+
+TEST(PathUtilsTest, TestTrimLastDelim) {
+    {
+        std::string path = "hdfs://auth/test_path/test_subdir";
+        PathUtil::TrimLastDelim(&path);
+        ASSERT_EQ(path, "hdfs://auth/test_path/test_subdir");
+    }
+    {
+        std::string path = "hdfs://auth/test_path/test_subdir/";
+        PathUtil::TrimLastDelim(&path);
+        ASSERT_EQ(path, "hdfs://auth/test_path/test_subdir");
+    }
+    {
+        std::string path = "/";
+        PathUtil::TrimLastDelim(&path);
+        ASSERT_EQ(path, "/");
+    }
+    {
+        std::string path = "";
+        PathUtil::TrimLastDelim(&path);
+        ASSERT_EQ(path, "");
+    }
+}
+
+TEST(PathUtilsTest, TestToPath) {
+    {
+        std::string test_path = "";
+        ASSERT_NOK_WITH_MSG(PathUtil::ToPath(test_path), "path is an empty 
string.");
+    }
+    {
+        std::string test_path = "FILE:///tmp";
+        ASSERT_OK_AND_ASSIGN(Path path, PathUtil::ToPath(test_path));
+        ASSERT_EQ(path.scheme, "FILE");
+        ASSERT_EQ(path.authority, "");
+        ASSERT_EQ(path.path, "/tmp");
+        ASSERT_EQ(path.ToString(), "FILE:/tmp");
+    }
+    {
+        std::string test_path = "dfs://tmp/index";
+        ASSERT_OK_AND_ASSIGN(Path path, PathUtil::ToPath(test_path));
+        ASSERT_EQ(path.scheme, "dfs");
+        ASSERT_EQ(path.authority, "tmp");
+        ASSERT_EQ(path.path, "/index");
+        ASSERT_EQ(path.ToString(), "dfs://tmp/index");
+    }
+    {
+        std::string test_path = "http://example.com:8080/api";;
+        ASSERT_OK_AND_ASSIGN(Path path, PathUtil::ToPath(test_path));
+        ASSERT_EQ(path.scheme, "http");
+        ASSERT_EQ(path.authority, "example.com:8080");
+        ASSERT_EQ(path.path, "/api");
+        ASSERT_EQ(path.ToString(), "http://example.com:8080/api";);
+    }
+    {
+        std::string test_path = "/tmp/index";
+        ASSERT_OK_AND_ASSIGN(Path path, PathUtil::ToPath(test_path));
+        ASSERT_EQ(path.scheme, "");
+        ASSERT_EQ(path.authority, "");
+        ASSERT_EQ(path.path, "/tmp/index");
+        ASSERT_EQ(path.ToString(), "/tmp/index");
+    }
+}
+
+TEST(PathUtilsTest, TestGetName) {
+    ASSERT_EQ("test", PathUtil::GetName("hdfs://tmp/test_path/test/"));
+    ASSERT_EQ("test", PathUtil::GetName("hdfs://tmp/test_path/test"));
+    ASSERT_EQ("test", PathUtil::GetName("test"));
+}
+
+TEST(PathUtilsTest, TestCreateTempPath) {
+    // tmp path: hdfs://tmp/test_path/.test.<uuid>.tmp;
+    ASSERT_OK_AND_ASSIGN(std::string tmp_path,
+                         
PathUtil::CreateTempPath("hdfs://tmp/test_path/test"));
+    ASSERT_EQ("hdfs://tmp/test_path", PathUtil::GetParentDirPath(tmp_path));
+    auto tmp_name = PathUtil::GetName(tmp_path);
+    ASSERT_TRUE(StringUtils::StartsWith(tmp_name, ".test."));
+    ASSERT_TRUE(StringUtils::EndsWith(tmp_name, ".tmp"));
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/utils/rapidjson_util.h 
b/src/paimon/common/utils/rapidjson_util.h
new file mode 100644
index 0000000..d413f25
--- /dev/null
+++ b/src/paimon/common/utils/rapidjson_util.h
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <cstdint>
+#include <map>
+#include <optional>
+#include <stdexcept>
+#include <string>
+
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/status.h"
+#include "paimon/traits.h"
+#include "rapidjson/allocators.h"
+#include "rapidjson/document.h"
+#include "rapidjson/encodings.h"
+#include "rapidjson/prettywriter.h"
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace paimon {
+
+class RapidJsonUtil {
+ public:
+    RapidJsonUtil() = delete;
+    ~RapidJsonUtil() = delete;
+
+    // if T is custom type, T must have ToJson()
+    template <typename T>
+    static inline Status ToJsonString(const T& obj, std::string* json_str) {
+        rapidjson::Document doc;
+        rapidjson::Document::AllocatorType& allocator = doc.GetAllocator();
+        rapidjson::Value value;
+        try {
+            if constexpr (is_pointer<T>::value) {
+                value = obj->ToJson(&allocator);
+            } else if constexpr (std::is_same_v<T, std::map<std::string, 
std::string>>) {
+                *json_str = MapToJsonString(obj);
+                return Status::OK();
+            } else {
+                value = obj.ToJson(&allocator);
+            }
+        } catch (const std::invalid_argument& e) {
+            return Status::Invalid("json serialize failed:", e.what());
+        } catch (...) {
+            return Status::Invalid("json serialize failed, unknown error");
+        }
+        if (!ToJson(value, json_str)) {
+            return Status::Invalid("serialize failed");
+        }
+        return Status::OK();
+    }
+
+    // if T is custom type, T must have FromJson()
+    template <typename T>
+    static inline Status FromJsonString(const std::string& json_str, T* obj) {
+        if (!obj) {
+            return Status::Invalid("deserialize failed: obj is nullptr");
+        }
+        if constexpr (std::is_same_v<T, std::map<std::string, std::string>>) {
+            PAIMON_ASSIGN_OR_RAISE(*obj, MapFromJsonString(json_str));
+        } else {
+            rapidjson::Document doc;
+            if (!FromJson(json_str, &doc)) {
+                return Status::Invalid("deserialize failed: ", json_str);
+            }
+            try {
+                obj->FromJson(doc);
+            } catch (const std::invalid_argument& e) {
+                return Status::Invalid("deserialize failed, possibly type 
incompatible: ",
+                                       e.what());
+            } catch (...) {
+                return Status::Invalid("deserialize failed, reason unknown: ", 
json_str);
+            }
+        }
+        return Status::OK();
+    }
+
+    // if T is std::nullopt, will use rapid_json null_value
+    template <typename T>
+    static rapidjson::Value SerializeValue(const T& obj,
+                                           rapidjson::Document::AllocatorType* 
allocator);
+
+    // condition1 : has key & value -> return value
+    // condition2 : no key or null value -> return default value
+    template <typename T>
+    static T DeserializeKeyValue(const rapidjson::Value& value, const 
std::string& key,
+                                 const T& default_value);
+
+    // condition1 : has key & value -> return value
+    // condition2 : no key or null value, T is optional -> return std::nullopt
+    // condition3 : no key or null value, T is not optional -> throw exception
+    template <typename T>
+    static T DeserializeKeyValue(const rapidjson::Value& value, const 
std::string& key);
+
+    template <typename T>
+    static T DeserializeValue(const rapidjson::Value& value);
+
+ private:
+    static inline bool ToJson(const rapidjson::Value& value, std::string* 
json_str) {
+        assert(json_str);
+        rapidjson::StringBuffer buffer;
+        rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(buffer);
+        if (!value.Accept(writer)) {
+            return false;
+        }
+        *json_str = buffer.GetString();
+        return true;
+    }
+
+    static inline bool FromJson(const std::string& json_str, 
rapidjson::Document* doc) {
+        doc->Parse(json_str.c_str());
+        if (doc->HasParseError()) {
+            return false;
+        }
+        return true;
+    }
+
+    template <typename T>
+    static rapidjson::Value SerializeMap(const T& map,
+                                         rapidjson::Document::AllocatorType* 
allocator);
+
+    template <typename T>
+    static rapidjson::Value SerializeVector(const T& vec,
+                                            
rapidjson::Document::AllocatorType* allocator);
+
+    template <typename T>
+    static T DeserializeVector(const rapidjson::Value& value);
+
+    template <typename T>
+    static T DeserializeMap(const rapidjson::Value& value);
+
+    template <typename T>
+    static T GetValue(const rapidjson::Value& value);
+
+    static std::string MapToJsonString(const std::map<std::string, 
std::string>& map) {
+        rapidjson::Document d;
+        d.SetObject();
+        rapidjson::Document::AllocatorType& allocator = d.GetAllocator();
+
+        for (const auto& kv : map) {
+            d.AddMember(rapidjson::Value(kv.first.c_str(), allocator),
+                        rapidjson::Value(kv.second.c_str(), allocator), 
allocator);
+        }
+
+        rapidjson::StringBuffer buffer;
+        rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+        d.Accept(writer);
+
+        return buffer.GetString();
+    }
+    static Result<std::map<std::string, std::string>> MapFromJsonString(
+        const std::string& json_str) {
+        rapidjson::Document doc;
+        doc.Parse(json_str.c_str());
+        if (doc.HasParseError() || !doc.IsObject()) {
+            return Status::Invalid("deserialize failed: parse error or not 
JSON object: ",
+                                   json_str);
+        }
+
+        std::map<std::string, std::string> result;
+        for (auto it = doc.MemberBegin(); it != doc.MemberEnd(); ++it) {
+            if (!it->name.IsString() || !it->value.IsString()) {
+                return Status::Invalid(
+                    "deserialize failed: non-string key or value in JSON 
object: ", json_str);
+            }
+            result[it->name.GetString()] = it->value.GetString();
+        }
+        return result;
+    }
+};
+
+template <typename T>
+inline rapidjson::Value RapidJsonUtil::SerializeValue(
+    const T& obj, rapidjson::Document::AllocatorType* allocator) {
+    if constexpr (is_optional<T>::value) {
+        if (obj == std::nullopt) {
+            rapidjson::Value null_value;
+            null_value.SetNull();
+            return null_value;
+        } else {
+            return SerializeValue(obj.value(), allocator);
+        }
+    } else {
+        if constexpr (std::is_same_v<T, std::string>) {
+            return rapidjson::Value(obj.c_str(), *allocator);
+        } else if constexpr (std::is_arithmetic_v<T>) {
+            return rapidjson::Value(obj);
+        } else if constexpr (is_map<T>::value) {
+            return SerializeMap(obj, allocator);
+        } else if constexpr (is_vector<T>::value) {
+            return SerializeVector(obj, allocator);
+        } else {
+            // custom type
+            return obj.ToJson(allocator);
+        }
+    }
+}
+
+template <typename T>
+inline rapidjson::Value RapidJsonUtil::SerializeMap(const T& map,
+                                                    
rapidjson::Document::AllocatorType* allocator) {
+    rapidjson::Value value(rapidjson::kObjectType);
+    using K = typename T::key_type;
+    for (const auto& kv : map) {
+        rapidjson::Value key;
+        if constexpr (std::is_same_v<K, std::string>) {
+            key = SerializeValue(kv.first, allocator);
+        } else if constexpr (std::is_same_v<T, bool>) {
+            throw std::invalid_argument("map key cannot be bool");
+        } else {
+            std::string key_str = std::to_string(kv.first);
+            key = SerializeValue(key_str, allocator);
+        }
+        rapidjson::Value val = SerializeValue(kv.second, allocator);
+        value.AddMember(key, val, *allocator);
+    }
+    return value;
+}
+
+template <typename T>
+inline rapidjson::Value RapidJsonUtil::SerializeVector(
+    const T& vec, rapidjson::Document::AllocatorType* allocator) {
+    rapidjson::Value value(rapidjson::kArrayType);
+    using V = typename T::value_type;
+    for (const V& item : vec) {
+        value.PushBack(SerializeValue(item, allocator), *allocator);
+    }
+    return value;
+}
+
+template <typename T>
+inline T RapidJsonUtil::DeserializeKeyValue(const rapidjson::Value& value, 
const std::string& key) {
+    if (!value.IsObject()) {
+        throw std::invalid_argument("value must be an object");
+    }
+    if constexpr (is_optional<T>::value) {
+        if (!value.HasMember(key.c_str()) || value[key].IsNull()) {
+            return std::nullopt;
+        } else {
+            return DeserializeValue<typename T::value_type>(value[key]);
+        }
+    } else {
+        if (!value.HasMember(key.c_str()) || value[key].IsNull()) {
+            throw std::invalid_argument("key must exist");
+        }
+        return DeserializeValue<T>(value[key]);
+    }
+}
+
+template <typename T>
+inline T RapidJsonUtil::DeserializeKeyValue(const rapidjson::Value& value, 
const std::string& key,
+                                            const T& default_value) {
+    if (!value.IsObject()) {
+        throw std::invalid_argument("value must be an object");
+    }
+    if (!value.HasMember(key.c_str()) || value[key].IsNull()) {
+        return default_value;
+    }
+    if constexpr (is_optional<T>::value) {
+        return DeserializeValue<typename T::value_type>(value[key]);
+    } else {
+        return DeserializeValue<T>(value[key]);
+    }
+}
+
+template <typename T>
+inline T RapidJsonUtil::DeserializeValue(const rapidjson::Value& value) {
+    if constexpr (is_vector<T>::value) {
+        return DeserializeVector<T>(value);
+    } else if constexpr (is_map<T>::value) {
+        return DeserializeMap<T>(value);
+    } else {
+        // arithmetic or string or custom type
+        return GetValue<T>(value);
+    }
+}
+
+template <typename T>
+inline T RapidJsonUtil::DeserializeVector(const rapidjson::Value& value) {
+    if (!value.IsArray()) {
+        throw std::invalid_argument("value must be an array");
+    }
+    T obj;
+    obj.reserve(value.Size());
+    using V = typename T::value_type;
+    for (const auto& item : value.GetArray()) {
+        obj.push_back(DeserializeValue<V>(item));
+    }
+    return obj;
+}
+
+template <typename T>
+inline T RapidJsonUtil::DeserializeMap(const rapidjson::Value& value) {
+    if (!value.IsObject()) {
+        throw std::invalid_argument("value must be an object");
+    }
+    using K = typename T::key_type;
+    using V = typename T::mapped_type;
+    T obj;
+    for (auto it = value.MemberBegin(); it != value.MemberEnd(); ++it) {
+        K key;
+        if constexpr (std::is_same_v<K, std::string>) {
+            key = DeserializeValue<K>(it->name);
+        } else {
+            auto key_str = DeserializeValue<std::string>(it->name);
+            auto optional_key = StringUtils::StringToValue<K>(key_str);
+            if (!optional_key) {
+                throw std::invalid_argument("key cannot be parse from string");
+            }
+            key = optional_key.value();
+        }
+        obj.emplace(key, DeserializeValue<V>(it->value));
+    }
+    return obj;
+}
+
+template <>
+inline bool RapidJsonUtil::GetValue<bool>(const rapidjson::Value& value) {
+    if (!value.IsBool()) {
+        throw std::invalid_argument("value must be bool");
+    }
+    return value.GetBool();
+}
+
+template <>
+inline int8_t RapidJsonUtil::GetValue<int8_t>(const rapidjson::Value& value) {
+    if (!value.IsInt()) {
+        throw std::invalid_argument("value must be int");
+    }
+    return static_cast<int8_t>(value.GetInt());
+}
+
+template <>
+inline uint8_t RapidJsonUtil::GetValue<uint8_t>(const rapidjson::Value& value) 
{
+    if (!value.IsUint()) {
+        throw std::invalid_argument("value must be uint");
+    }
+    return static_cast<uint8_t>(value.GetUint());
+}
+
+template <>
+inline int16_t RapidJsonUtil::GetValue<int16_t>(const rapidjson::Value& value) 
{
+    if (!value.IsInt()) {
+        throw std::invalid_argument("value must be int");
+    }
+    return static_cast<int16_t>(value.GetInt());
+}
+
+template <>
+inline uint16_t RapidJsonUtil::GetValue<uint16_t>(const rapidjson::Value& 
value) {
+    if (!value.IsUint()) {
+        throw std::invalid_argument("value must be uint");
+    }
+    return static_cast<uint16_t>(value.GetUint());
+}
+
+template <>
+inline int32_t RapidJsonUtil::GetValue<int32_t>(const rapidjson::Value& value) 
{
+    if (!value.IsInt()) {
+        throw std::invalid_argument("value must be int");
+    }
+    return value.GetInt();
+}
+
+template <>
+inline uint32_t RapidJsonUtil::GetValue<uint32_t>(const rapidjson::Value& 
value) {
+    if (!value.IsUint()) {
+        throw std::invalid_argument("value must be uint");
+    }
+    return value.GetUint();
+}
+
+template <>
+inline int64_t RapidJsonUtil::GetValue<int64_t>(const rapidjson::Value& value) 
{
+    if (!value.IsInt64()) {
+        throw std::invalid_argument("value must be int64");
+    }
+    return value.GetInt64();
+}
+
+template <>
+inline uint64_t RapidJsonUtil::GetValue<uint64_t>(const rapidjson::Value& 
value) {
+    if (!value.IsUint64()) {
+        throw std::invalid_argument("value must be uint64");
+    }
+    return value.GetUint64();
+}
+
+template <>
+inline double RapidJsonUtil::GetValue<double>(const rapidjson::Value& value) {
+    if (!value.IsDouble()) {
+        throw std::invalid_argument("value must be double");
+    }
+    return value.GetDouble();
+}
+
+template <>
+inline float RapidJsonUtil::GetValue<float>(const rapidjson::Value& value) {
+    if (!value.IsDouble()) {
+        throw std::invalid_argument("value must be double");
+    }
+    return static_cast<float>(value.GetDouble());
+}
+
+template <>
+inline std::string RapidJsonUtil::GetValue<std::string>(const 
rapidjson::Value& value) {
+    if (!value.IsString()) {
+        throw std::invalid_argument("value must be string");
+    }
+    return std::string(value.GetString(), value.GetStringLength());
+}
+
+template <typename T>
+inline T RapidJsonUtil::GetValue(const rapidjson::Value& value) {
+    // custom type
+    T obj;
+    obj.FromJson(value);
+    return obj;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/utils/rapidjson_util_test.cpp 
b/src/paimon/common/utils/rapidjson_util_test.cpp
new file mode 100644
index 0000000..c5011d9
--- /dev/null
+++ b/src/paimon/common/utils/rapidjson_util_test.cpp
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/utils/rapidjson_util.h"
+
+#include <algorithm>
+#include <map>
+#include <string>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/testing/utils/testharness.h"
+#include "rapidjson/allocators.h"
+#include "rapidjson/document.h"
+#include "rapidjson/rapidjson.h"
+
+namespace paimon::test {
+
+TEST(RapidJsonUtilTest, TestSerializeAndDeserialize) {
+    // serialize
+    rapidjson::Document doc;
+    doc.SetObject();
+    rapidjson::Document::AllocatorType& allocator = doc.GetAllocator();
+    // string
+    std::string str_value = "John";
+    doc.AddMember("name", RapidJsonUtil::SerializeValue(str_value, 
&allocator).Move(), allocator);
+    // int
+    int32_t int_value = 30;
+    doc.AddMember("age", RapidJsonUtil::SerializeValue(int_value, 
&allocator).Move(), allocator);
+    // vector
+    std::vector<int> vector_value = {7, 12};
+    doc.AddMember("vector_value", RapidJsonUtil::SerializeValue(vector_value, 
&allocator).Move(),
+                  allocator);
+    // map
+    std::map<std::string, double> map_value = {{"a", 0.2}, {"b", 1.2}};
+    doc.AddMember("map_value", RapidJsonUtil::SerializeValue(map_value, 
&allocator).Move(),
+                  allocator);
+
+    // vector of vector
+    std::vector<std::vector<int>> vector_of_vector = {{7, 12}, {27, 45}};
+    doc.AddMember("vector_of_vector",
+                  RapidJsonUtil::SerializeValue(vector_of_vector, 
&allocator).Move(), allocator);
+
+    // vector of map
+    std::vector<std::map<std::string, double>> vector_of_map = {{{{"a", 0.2}, 
{"b", 1.2}}},
+                                                                {{"c", 2.2}, 
{"d", 3.2}}};
+    doc.AddMember("vector_of_map", 
RapidJsonUtil::SerializeValue(vector_of_map, &allocator).Move(),
+                  allocator);
+
+    // map of vector
+    std::map<std::string, std::vector<int>> map_of_vector = {{"aa", {7, 12}}, 
{"bb", {27, 45}}};
+    doc.AddMember("map_of_vector", 
RapidJsonUtil::SerializeValue(map_of_vector, &allocator).Move(),
+                  allocator);
+
+    std::optional<int64_t> null_value;
+    doc.AddMember("null_value", RapidJsonUtil::SerializeValue(null_value, 
&allocator).Move(),
+                  allocator);
+
+    std::optional<std::string> optional_value("abcd");
+    doc.AddMember("optional_value",
+                  RapidJsonUtil::SerializeValue(optional_value, 
&allocator).Move(), allocator);
+
+    // map with int key (not string key, will convert in util)
+    std::map<int32_t, int64_t> map_with_int_key = {{100, 1000}, {200, 2000}};
+    doc.AddMember("map_with_int_key",
+                  RapidJsonUtil::SerializeValue(map_with_int_key, 
&allocator).Move(), allocator);
+
+    std::string jsonStr;
+    ASSERT_TRUE(RapidJsonUtil::ToJson(doc, &jsonStr));
+
+    // deserialize
+    rapidjson::Document doc2;
+    ASSERT_TRUE(RapidJsonUtil::FromJson(jsonStr, &doc2));
+
+    ASSERT_EQ(str_value, RapidJsonUtil::DeserializeKeyValue<std::string>(doc2, 
"name", ""));
+    ASSERT_EQ(int_value, RapidJsonUtil::DeserializeKeyValue<int32_t>(doc2, 
"age", -1));
+    ASSERT_EQ(vector_value,
+              RapidJsonUtil::DeserializeKeyValue<std::vector<int>>(doc2, 
"vector_value", {}));
+
+    auto de_map_value =
+        RapidJsonUtil::DeserializeKeyValue<std::map<std::string, 
double>>(doc2, "map_value", {});
+    ASSERT_EQ(map_value, de_map_value);
+
+    auto de_vector_of_vector = 
RapidJsonUtil::DeserializeKeyValue<std::vector<std::vector<int>>>(
+        doc2, "vector_of_vector", {});
+    ASSERT_EQ(vector_of_vector, de_vector_of_vector);
+
+    auto de_vector_of_map =
+        RapidJsonUtil::DeserializeKeyValue<std::vector<std::map<std::string, 
double>>>(
+            doc2, "vector_of_map", {});
+    ASSERT_EQ(vector_of_map, de_vector_of_map);
+
+    auto de_map_of_vector =
+        RapidJsonUtil::DeserializeKeyValue<std::map<std::string, 
std::vector<int>>>(
+            doc2, "map_of_vector", {});
+    ASSERT_EQ(map_of_vector, de_map_of_vector);
+
+    auto de_null_value =
+        RapidJsonUtil::DeserializeKeyValue<std::optional<int64_t>>(doc2, 
"null_value");
+    ASSERT_EQ(null_value, de_null_value);
+
+    auto de_null_value_with_default = 
RapidJsonUtil::DeserializeKeyValue<std::optional<int64_t>>(
+        doc2, "null_value", /*default_value=*/std::optional<int64_t>(2333));
+    ASSERT_EQ(2333, de_null_value_with_default.value());
+
+    auto de_optional_value =
+        RapidJsonUtil::DeserializeKeyValue<std::optional<std::string>>(doc2, 
"optional_value");
+    ASSERT_EQ(optional_value, de_optional_value);
+
+    auto de_map_with_int_key = 
RapidJsonUtil::DeserializeKeyValue<std::map<int32_t, int64_t>>(
+        doc2, "map_with_int_key", {});
+    ASSERT_EQ(map_with_int_key, de_map_with_int_key);
+
+    // test non exist key, will use default value
+    double non_exist_value = 0.0;
+    non_exist_value = RapidJsonUtil::DeserializeKeyValue<double>(doc2, 
"non_exist_key", 2.333);
+    ASSERT_EQ(2.333, non_exist_value);
+}
+
+TEST(RapidJsonUtilTest, TestMapJsonString) {
+    std::map<std::string, std::string> m1 = {{"key1", "value1"}, {"key2", 
"value2"}};
+    std::string result;
+    ASSERT_OK(RapidJsonUtil::ToJsonString(m1, &result));
+    ASSERT_EQ(result, "{\"key1\":\"value1\",\"key2\":\"value2\"}");
+
+    std::map<std::string, std::string> m2;
+    ASSERT_OK(RapidJsonUtil::FromJsonString(result, &m2));
+    ASSERT_EQ(m1, m2);
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/utils/string_utils.cpp 
b/src/paimon/common/utils/string_utils.cpp
new file mode 100644
index 0000000..5b40589
--- /dev/null
+++ b/src/paimon/common/utils/string_utils.cpp
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/utils/string_utils.h"
+
+#include <algorithm>
+#include <cctype>
+#include <ctime>
+#include <iomanip>
+#include <iterator>
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/status.h"
+
+namespace paimon {
+std::string StringUtils::Replace(const std::string& text, const std::string& 
search_string,
+                                 const std::string& replacement, int32_t max) {
+    std::string str = text;
+    size_t pos = str.find(search_string);
+    int32_t count = 0;
+    while (pos != std::string::npos && (count < max || max == -1)) {
+        str.replace(pos, search_string.size(), replacement);
+        pos = str.find(search_string, pos + replacement.size());
+        count++;
+    }
+    return str;
+}
+
+std::string StringUtils::ReplaceLast(const std::string& text, const 
std::string& old_str,
+                                     const std::string& new_str) {
+    std::string str = text;
+    size_t pos = str.rfind(old_str);
+    if (pos != std::string::npos) {
+        str.replace(pos, old_str.size(), new_str);
+    }
+    return str;
+}
+
+bool StringUtils::StartsWith(const std::string& str, const std::string& 
prefix, size_t start_pos) {
+    return (str.size() >= prefix.size()) && (str.compare(start_pos, 
prefix.size(), prefix) == 0);
+}
+bool StringUtils::EndsWith(const std::string& str, const std::string& suffix) {
+    size_t s1 = str.size();
+    size_t s2 = suffix.size();
+    return (s1 >= s2) && (str.compare(s1 - s2, s2, suffix) == 0);
+}
+bool StringUtils::IsNullOrWhitespaceOnly(const std::string& str) {
+    if (str.empty()) {
+        return true;
+    }
+    for (char c : str) {
+        if (!std::isspace(static_cast<unsigned char>(c))) {
+            return false;
+        }
+    }
+    return true;
+}
+
+void StringUtils::Trim(std::string* str) {
+    str->erase(str->find_last_not_of(' ') + 1);
+    str->erase(0, str->find_first_not_of(' '));
+}
+
+std::string StringUtils::ToLowerCase(const std::string& str) {
+    std::string result;
+    result.reserve(str.length());
+    std::transform(str.begin(), str.end(), std::back_inserter(result),
+                   [](unsigned char c) { return std::tolower(c); });
+    return result;
+}
+
+std::string StringUtils::ToUpperCase(const std::string& str) {
+    std::string result;
+    result.reserve(str.length());
+    std::transform(str.begin(), str.end(), std::back_inserter(result),
+                   [](unsigned char c) { return std::toupper(c); });
+    return result;
+}
+
+std::vector<std::string> StringUtils::Split(const std::string& text, const 
std::string& sep_str,
+                                            bool ignore_empty) {
+    std::vector<std::string> vec;
+    if (sep_str.empty()) {
+        // invalid case, do not split.
+        vec.emplace_back(text);
+        return vec;
+    }
+    size_t n = 0, old = 0;
+    while (n != std::string::npos) {
+        n = text.find(sep_str, n);
+        if (n != std::string::npos) {
+            if (!ignore_empty || n != old) {
+                vec.emplace_back(text.substr(old, n - old));
+            }
+            n += sep_str.length();
+            old = n;
+        }
+    }
+
+    if (!ignore_empty || old < text.length()) {
+        vec.emplace_back(text.substr(old, text.length() - old));
+    }
+    return vec;
+}
+
+std::vector<std::vector<std::string>> StringUtils::Split(const std::string& 
text,
+                                                         const std::string& 
delim1,
+                                                         const std::string& 
delim2) {
+    std::vector<std::vector<std::string>> result;
+    std::vector<std::string> split_parts = Split(text, delim1);
+    result.reserve(split_parts.size());
+    for (auto& part : split_parts) {
+        result.emplace_back(Split(part, delim2));
+    }
+    return result;
+}
+
+Result<int32_t> StringUtils::StringToDate(const std::string& str) {
+    auto int_value = StringToValue<int32_t>(str);
+    if (int_value) {
+        return int_value.value();
+    }
+    std::tm timeinfo = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, nullptr};
+    std::istringstream ss(str);
+    ss >> std::get_time(&timeinfo, "%Y-%m-%d");
+    if (ss.fail()) {
+        return Status::Invalid(fmt::format("failed to convert string '{}' to 
date", str));
+    }
+    int32_t orig_mon = timeinfo.tm_mon;
+    int32_t orig_mday = timeinfo.tm_mday;
+    std::time_t time = timegm(&timeinfo);
+    if (time == -1 || timeinfo.tm_mon != orig_mon || timeinfo.tm_mday != 
orig_mday) {
+        return Status::Invalid(fmt::format("failed to convert string '{}' to 
date", str));
+    }
+    static const int64_t SECONDS_PER_DAY = 86400l;  // = 24 * 60 * 60
+    return time / SECONDS_PER_DAY;
+}
+
+/// Parses a timestamp string into unix milliseconds.
+/// Supported formats: "yyyy-MM-dd", "yyyy-MM-dd HH:mm:ss", "yyyy-MM-dd 
HH:mm:ss.SSS".
+/// Uses the default local time zone, consistent with Java Paimon behavior.
+Result<int64_t> StringUtils::StringToTimestampMillis(const std::string& str) {
+    std::tm timeinfo{};
+    timeinfo.tm_isdst = -1;
+
+    // Try "yyyy-MM-dd HH:mm:ss" first (also matches "yyyy-MM-dd HH:mm:ss.SSS")
+    std::istringstream ss(str);
+    ss >> std::get_time(&timeinfo, "%Y-%m-%d %H:%M:%S");
+    int32_t millis_part = 0;
+
+    if (!ss.fail()) {
+        // Check for optional fractional seconds ".SSS"
+        if (ss.peek() == '.') {
+            ss.get();
+            std::string frac;
+            while (frac.size() < 3 && ss.peek() != 
std::char_traits<char>::eof() &&
+                   std::isdigit(static_cast<unsigned char>(ss.peek()))) {
+                frac += static_cast<char>(ss.get());
+            }
+            if (frac.empty()) {
+                return Status::Invalid(
+                    fmt::format("failed to convert string '{}' to timestamp, "
+                                "expected digits after '.'",
+                                str));
+            }
+            // Pad to 3 digits: "1" -> 100, "12" -> 120, "123" -> 123
+            while (frac.size() < 3) {
+                frac += '0';
+            }
+            auto parsed = StringToValue<int32_t>(frac);
+            if (parsed) {
+                millis_part = parsed.value();
+            }
+        }
+    } else {
+        // Fall back to "yyyy-MM-dd" (date only, time defaults to 00:00:00)
+        ss.clear();
+        ss.str(str);
+        timeinfo = std::tm{};
+        timeinfo.tm_isdst = -1;
+        ss >> std::get_time(&timeinfo, "%Y-%m-%d");
+        if (ss.fail()) {
+            return Status::Invalid(
+                fmt::format("failed to convert string '{}' to timestamp, "
+                            "supported formats: yyyy-MM-dd, yyyy-MM-dd 
HH:mm:ss, "
+                            "yyyy-MM-dd HH:mm:ss.SSS",
+                            str));
+        }
+    }
+
+    if (ss.peek() != std::char_traits<char>::eof()) {
+        return Status::Invalid(
+            fmt::format("failed to convert string '{}' to timestamp, "
+                        "unexpected trailing characters",
+                        str));
+    }
+
+    int32_t orig_mon = timeinfo.tm_mon;
+    int32_t orig_mday = timeinfo.tm_mday;
+    std::time_t time = mktime(&timeinfo);
+    if (time == -1 || timeinfo.tm_mon != orig_mon || timeinfo.tm_mday != 
orig_mday) {
+        return Status::Invalid(fmt::format("failed to convert string '{}' to 
timestamp", str));
+    }
+    return static_cast<int64_t>(time) * 1000 + millis_part;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/utils/string_utils.h 
b/src/paimon/common/utils/string_utils.h
new file mode 100644
index 0000000..3c0906e
--- /dev/null
+++ b/src/paimon/common/utils/string_utils.h
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <algorithm>
+#include <cassert>
+#include <cctype>
+#include <charconv>
+#include <chrono>
+#include <cstddef>
+#include <cstdint>
+#include <iomanip>
+#include <optional>
+#include <set>
+#include <sstream>
+#include <string>
+#include <system_error>
+#include <vector>
+
+#include "fmt/core.h"
+#include "fmt/format.h"
+#include "fmt/ranges.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/result.h"
+#include "paimon/traits.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+/// Utils for string.
+class PAIMON_EXPORT StringUtils {
+ public:
+    /// Replaces all occurrences of a string within another string.
+    ///
+    /// A `null` reference passed to this method is a no-op.
+    ///
+    /// <pre>
+    /// StringUtils::Replace(null, *, *)        = null
+    /// StringUtils::Replace("", *, *)          = ""
+    /// StringUtils::Replace("any", null, *)    = "any"
+    /// StringUtils::Replace("any", *, null)    = "any"
+    /// StringUtils::Replace("any", "", *)      = "any"
+    /// StringUtils::Replace("aba", "a", null)  = "aba"
+    /// StringUtils::Replace("aba", "a", "")    = "b"
+    /// StringUtils::Replace("aba", "a", "z")   = "zbz"
+    /// </pre>
+    ///
+    /// @see #replace(string text, string search_string, string replacement, 
int max)
+    /// @param text text to search and replace in, may be null
+    /// @param search_string the String to search for, may be null
+    /// @param replacement the String to replace it with, may be null
+    /// @return the text with any replacements processed, `null` if null 
string input
+    static std::string Replace(const std::string& text, const std::string& 
search_string,
+                               const std::string& replacement) {
+        return Replace(text, search_string, replacement, -1);
+    }
+
+    /// Replaces a String with another String inside a larger String, for the 
first `max` values of
+    /// the search String.
+    ///
+    /// A `null` reference passed to this method is a no-op.
+    ///
+    /// <pre>
+    /// StringUtils::Replace(null, *, *, *)         = null
+    /// StringUtils::Replace("", *, *, *)           = ""
+    /// StringUtils::Replace("any", null, *, *)     = "any"
+    /// StringUtils::Replace("any", *, null, *)     = "any"
+    /// StringUtils::Replace("any", "", *, *)       = "any"
+    /// StringUtils::Replace("any", *, *, 0)        = "any"
+    /// StringUtils::Replace("abaa", "a", null, -1) = "abaa"
+    /// StringUtils::Replace("abaa", "a", "", -1)   = "b"
+    /// StringUtils::Replace("abaa", "a", "z", 0)   = "abaa"
+    /// StringUtils::Replace("abaa", "a", "z", 1)   = "zbaa"
+    /// StringUtils::Replace("abaa", "a", "z", 2)   = "zbza"
+    /// StringUtils::Replace("abaa", "a", "z", -1)  = "zbzz"
+    /// </pre>
+    ///
+    /// @param text text to search and replace in, may be null
+    /// @param search_string the String to search for, may be null
+    /// @param replacement the String to replace it with, may be null
+    /// @param max maximum number of values to replace, or `-1` if no maximum
+    /// @return the text with any replacements processed, `null` if null 
string input
+    static std::string Replace(const std::string& text, const std::string& 
search_string,
+                               const std::string& replacement, int32_t max);
+
+    static std::string ReplaceLast(const std::string& text, const std::string& 
old_str,
+                                   const std::string& new_str);
+
+    static bool StartsWith(const std::string& str, const std::string& prefix, 
size_t start_pos = 0);
+
+    static bool EndsWith(const std::string& str, const std::string& suffix);
+
+    static bool IsNullOrWhitespaceOnly(const std::string& str);
+
+    static void Trim(std::string* str);
+
+    static std::string ToLowerCase(const std::string& str);
+    static std::string ToUpperCase(const std::string& str);
+
+    template <typename T>
+    static std::string VectorToString(const std::vector<T>& vec) {
+        std::vector<std::string> strs;
+        strs.reserve(vec.size());
+        for (const auto& value : vec) {
+            if constexpr (is_optional<T>::value) {
+                if (value == std::nullopt) {
+                    strs.emplace_back("null");
+                } else {
+                    strs.emplace_back(value.value().ToString());
+                }
+            } else if constexpr (is_pointer<T>::value) {
+                strs.emplace_back(value->ToString());
+            } else {
+                strs.emplace_back(value.ToString());
+            }
+        }
+        return fmt::format("[{}]", fmt::join(strs, ", "));
+    }
+
+    static std::vector<std::string> Split(const std::string& text, const 
std::string& sep_str,
+                                          bool ignore_empty = true);
+
+    static std::vector<std::vector<std::string>> Split(const std::string& text,
+                                                       const std::string& 
delim1,
+                                                       const std::string& 
delim2);
+
+    static Result<int32_t> StringToDate(const std::string& str);
+
+    static Result<int64_t> StringToTimestampMillis(const std::string& str);
+
+    template <typename T>
+    static std::optional<T> StringToValue(const std::string& str);
+};
+
+template <typename T>
+std::optional<T> StringUtils::StringToValue(const std::string& str) {
+    static_assert(std::is_trivially_copyable_v<T>, "T must be trivially 
copyable");
+    if (str.length() == 0) {
+        return std::nullopt;
+    }
+
+    if constexpr (std::is_same_v<T, int8_t> || std::is_same_v<T, int16_t> ||
+                  std::is_same_v<T, int32_t> || std::is_same_v<T, int64_t> ||
+                  std::is_same_v<T, uint8_t> || std::is_same_v<T, uint16_t> ||
+                  std::is_same_v<T, uint32_t> || std::is_same_v<T, uint64_t>) {
+        T value{};
+        int32_t base = 10;
+        auto str_data = str.data();
+        auto str_size = str.size();
+        if constexpr (std::is_unsigned_v<T>) {
+            if (str_data[0] == '-') {
+                return std::nullopt;
+            }
+        }
+        auto result = std::from_chars(str_data, str_data + str_size, value, 
base);
+        if (result.ec != std::errc() || result.ptr != str_data + str_size) {
+            return std::nullopt;
+        } else {
+            return value;
+        }
+    } else if constexpr (std::is_same_v<T, double> || std::is_same_v<T, 
float>) {
+        T value;
+        std::istringstream iss(str);
+        iss >> value;
+        if (iss && iss.eof()) {
+            return value;
+        }
+        return std::nullopt;
+    } else if constexpr (std::is_same_v<T, bool>) {
+        static const std::set<std::string> TRUE_STRINGS = {"t", "true", "y", 
"yes", "1"};
+        static const std::set<std::string> FALSE_STRINGS = {"f", "false", "n", 
"no", "0"};
+        std::string lower_case = ToLowerCase(str);
+        if (TRUE_STRINGS.find(lower_case) != TRUE_STRINGS.end()) {
+            return true;
+        } else if (FALSE_STRINGS.find(lower_case) != FALSE_STRINGS.end()) {
+            return false;
+        } else {
+            return std::nullopt;
+        }
+    } else {
+        assert(false);
+        return std::nullopt;
+    }
+}
+
+template <>
+inline std::optional<std::string> 
StringUtils::StringToValue<std::string>(const std::string& str) {
+    return str;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/utils/string_utils_test.cpp 
b/src/paimon/common/utils/string_utils_test.cpp
new file mode 100644
index 0000000..11c3e00
--- /dev/null
+++ b/src/paimon/common/utils/string_utils_test.cpp
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/utils/string_utils.h"
+
+#include <limits>
+#include <memory>
+
+#include "gtest/gtest.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+#include "paimon/testing/utils/timezone_guard.h"
+
+namespace paimon::test {
+class StringUtilsTest : public ::testing::Test {
+ public:
+    void SetUp() override {}
+    void TearDown() override {}
+
+ private:
+    template <typename T>
+    void CheckBoundary(const std::string& max_value_str, const std::string& 
min_value_str);
+    template <typename T>
+    void CheckOverFlowAndUnderFlow(const std::string& over_flow, const 
std::string& under_flow);
+};
+
+template <typename T>
+void StringUtilsTest::CheckBoundary(const std::string& max_value_str,
+                                    const std::string& min_value_str) {
+    ASSERT_EQ(std::numeric_limits<T>::min(), 
StringUtils::StringToValue<T>(min_value_str).value());
+    ASSERT_EQ(std::numeric_limits<T>::max(), 
StringUtils::StringToValue<T>(max_value_str).value());
+}
+
+template <>
+void StringUtilsTest::CheckBoundary<double>(const std::string& max_value_str,
+                                            const std::string& min_value_str) {
+    ASSERT_NEAR(-std::numeric_limits<double>::max(),
+                StringUtils::StringToValue<double>(min_value_str).value(), 
0.00001e+308);
+    ASSERT_NEAR(std::numeric_limits<double>::max(),
+                StringUtils::StringToValue<double>(max_value_str).value(), 
0.00001e+308);
+}
+
+template <>
+void StringUtilsTest::CheckBoundary<float>(const std::string& max_value_str,
+                                           const std::string& min_value_str) {
+    ASSERT_NEAR(-std::numeric_limits<float>::max(),
+                StringUtils::StringToValue<float>(min_value_str).value(), 
0.00001e+38);
+    ASSERT_NEAR(std::numeric_limits<float>::max(),
+                StringUtils::StringToValue<float>(max_value_str).value(), 
0.00001e+38);
+}
+
+template <typename T>
+void StringUtilsTest::CheckOverFlowAndUnderFlow(const std::string& over_flow,
+                                                const std::string& under_flow) 
{
+    ASSERT_EQ(StringUtils::StringToValue<T>(over_flow), std::nullopt);
+    ASSERT_EQ(StringUtils::StringToValue<T>(under_flow), std::nullopt);
+}
+
+TEST_F(StringUtilsTest, TestReplaceAll) {
+    {
+        std::string origin = "how is is you";
+        std::string expect = "how are are you";
+        std::string result = StringUtils::Replace(origin, "is", "are");
+        ASSERT_EQ(expect, result);
+    }
+    {
+        std::string origin = "aabac";
+        std::string expect = "aaaabaac";
+        std::string result = StringUtils::Replace(origin, "a", "aa");
+        ASSERT_EQ(expect, result);
+    }
+    {
+        std::string origin = "aaaabaac";
+        std::string expect = "aabac";
+        std::string result = StringUtils::Replace(origin, "aa", "a");
+        ASSERT_EQ(expect, result);
+    }
+    {
+        std::string origin = "aaaabaac";
+        std::string expect = "aaaabaac";
+        std::string result = StringUtils::Replace(origin, "abc", "a");
+        ASSERT_EQ(expect, result);
+    }
+    {
+        std::string origin = "aaaaaaaa";
+        std::string expect = "bbbb";
+        std::string result = StringUtils::Replace(origin, "aa", "b");
+        ASSERT_EQ(expect, result);
+    }
+    {
+        std::string origin = "aaaaaaaaa";
+        std::string expect = "bbbba";
+        std::string result = StringUtils::Replace(origin, "aa", "b");
+        ASSERT_EQ(expect, result);
+    }
+    {
+        std::string origin = "/home/admin/ops";
+        std::string expect = R"(\/home\/admin\/ops)";
+        std::string result = StringUtils::Replace(origin, "/", "\\/");
+        ASSERT_EQ(expect, result);
+    }
+}
+
+TEST_F(StringUtilsTest, TestReplaceLast) {
+    {
+        std::string origin = "a/b/c//";
+        std::string expect = "a/b/c/_";
+        std::string actual = StringUtils::ReplaceLast(origin, "/", "_");
+        ASSERT_EQ(expect, actual);
+    }
+    {
+        std::string origin = "a/b/c//";
+        std::string expect = "a/b/c//";
+        std::string actual = StringUtils::ReplaceLast(origin, "_", "/");
+        ASSERT_EQ(expect, actual);
+    }
+
+    {
+        std::string origin = "how is is you";
+        std::string expect = "how is are you";
+        std::string actual = StringUtils::ReplaceLast(origin, "is", "are");
+        ASSERT_EQ(expect, actual);
+    }
+}
+
+TEST_F(StringUtilsTest, TestReplaceWithMaxCount) {
+    {
+        std::string origin = "how is is you";
+        std::string expect = "how are is you";
+        std::string result = StringUtils::Replace(origin, "is", "are", 1);
+        ASSERT_EQ(expect, result);
+    }
+    {
+        std::string origin = "aabac";
+        std::string expect = "aaaabac";
+        std::string result = StringUtils::Replace(origin, "a", "aa", 2);
+        ASSERT_EQ(expect, result);
+    }
+    {
+        std::string origin = "aaaabaac";
+        std::string expect = "aaaabaac";
+        std::string result = StringUtils::Replace(origin, "aa", "a", 0);
+        ASSERT_EQ(expect, result);
+    }
+    {
+        std::string origin = "aaaaaaaa";
+        std::string expect = "bbbb";
+        std::string result = StringUtils::Replace(origin, "aa", "b", 100);
+        ASSERT_EQ(expect, result);
+    }
+    {
+        std::string origin = "aaaaaaaaa";
+        std::string expect = "bbbaaa";
+        std::string result = StringUtils::Replace(origin, "aa", "b", 3);
+        ASSERT_EQ(expect, result);
+    }
+    {
+        std::string origin = "/home/admin/ops";
+        std::string expect = "\\/home\\/admin/ops";
+        std::string result = StringUtils::Replace(origin, "/", "\\/", 2);
+        ASSERT_EQ(expect, result);
+    }
+}
+
+TEST_F(StringUtilsTest, TestIsNullOrWhitespaceOnly) {
+    {
+        std::string str = "";
+        auto ret = StringUtils::IsNullOrWhitespaceOnly(str);
+        ASSERT_TRUE(ret);
+    }
+    {
+        std::string str = "a a a a";
+        auto ret = StringUtils::IsNullOrWhitespaceOnly(str);
+        ASSERT_FALSE(ret);
+    }
+    {
+        std::string str = "     ";
+        auto ret = StringUtils::IsNullOrWhitespaceOnly(str);
+        ASSERT_TRUE(ret);
+    }
+    {
+        std::string str = "\n";
+        auto ret = StringUtils::IsNullOrWhitespaceOnly(str);
+        ASSERT_TRUE(ret);
+    }
+    {
+        std::string str = "\t";
+        auto ret = StringUtils::IsNullOrWhitespaceOnly(str);
+        ASSERT_TRUE(ret);
+    }
+}
+
+TEST_F(StringUtilsTest, TestToLowerCase) {
+    {
+        std::string str = "HDGF";
+        ASSERT_EQ("hdgf", StringUtils::ToLowerCase(str));
+    }
+    {
+        std::string str = "ab CD ffg +8";
+        ASSERT_EQ("ab cd ffg +8", StringUtils::ToLowerCase(str));
+    }
+    {
+        std::string str = "";
+        ASSERT_EQ("", StringUtils::ToLowerCase(str));
+    }
+}
+
+TEST_F(StringUtilsTest, TestToUpperCase) {
+    {
+        std::string str = "hdgf";
+        ASSERT_EQ("HDGF", StringUtils::ToUpperCase(str));
+    }
+    {
+        std::string str = "AB cd ffg +8";
+        ASSERT_EQ("AB CD FFG +8", StringUtils::ToUpperCase(str));
+    }
+    {
+        std::string str = "";
+        ASSERT_EQ("", StringUtils::ToUpperCase(str));
+    }
+}
+
+TEST_F(StringUtilsTest, TestStartsWith) {
+    {
+        std::string str = "abcde";
+        ASSERT_TRUE(StringUtils::StartsWith(str, "ab"));
+    }
+    {
+        std::string str = "abcde";
+        ASSERT_FALSE(StringUtils::StartsWith(str, "ba"));
+    }
+    {
+        std::string str = "abcde";
+        ASSERT_TRUE(StringUtils::StartsWith(str, "bc", /*start_pos=*/1));
+    }
+    {
+        std::string str = "abcde";
+        ASSERT_FALSE(StringUtils::StartsWith(str, "bc", /*start_pos=*/3));
+    }
+    {
+        std::string str = "";
+        ASSERT_FALSE(StringUtils::StartsWith(str, "bc"));
+    }
+    {
+        std::string str = "";
+        ASSERT_TRUE(StringUtils::StartsWith(str, ""));
+    }
+}
+TEST_F(StringUtilsTest, TestEndsWith) {
+    {
+        std::string str = "abcde";
+        ASSERT_TRUE(StringUtils::EndsWith(str, "de"));
+    }
+    {
+        std::string str = "abcde";
+        ASSERT_FALSE(StringUtils::EndsWith(str, "ba"));
+    }
+    {
+        std::string str = "";
+        ASSERT_FALSE(StringUtils::EndsWith(str, "bc"));
+    }
+    {
+        std::string str = "";
+        ASSERT_TRUE(StringUtils::EndsWith(str, ""));
+    }
+}
+
+TEST_F(StringUtilsTest, TestSplit) {
+    {
+        std::vector<std::string> expect = {"aabbcc"};
+        std::vector<std::string> result = StringUtils::Split("aabbcc", "");
+        ASSERT_EQ(expect, result);
+    }
+    {
+        std::vector<std::string> expect = {"aa", "bb", "cc"};
+        std::vector<std::string> result = StringUtils::Split("aa,bb,cc", ",");
+        ASSERT_EQ(expect, result);
+    }
+    {
+        std::vector<std::string> expect = {"aa", "bb", "cc"};
+        std::vector<std::string> result =
+            StringUtils::Split("aa,bb,,cc", ",", /*ignore_empty=*/true);
+        ASSERT_EQ(expect, result);
+    }
+    {
+        std::vector<std::string> expect = {"aa", "bb", "", "cc"};
+        std::vector<std::string> result =
+            StringUtils::Split("aa,bb,,cc", ",", /*ignore_empty=*/false);
+        ASSERT_EQ(expect, result);
+    }
+    {
+        std::vector<std::vector<std::string>> expect = {
+            {"key1", "value1"}, {"key2", "value2"}, {"key3", "value3"}};
+        std::vector<std::vector<std::string>> result = StringUtils::Split(
+            "key1=value1/key2=value2/key3=value3", std::string("/"), 
std::string("="));
+        ASSERT_EQ(expect, result);
+    }
+    {
+        std::vector<std::vector<std::string>> expect = {{"key1"}, {"key2"}, 
{"key3", "value3"}};
+        std::vector<std::vector<std::string>> result =
+            StringUtils::Split("key1/key2=/key3=value3", std::string("/"), 
std::string("="));
+        ASSERT_EQ(expect, result);
+    }
+    {
+        std::vector<std::vector<std::string>> expect = {
+            {"key1"}, {"key2", "   "}, {"key3", "value3"}};
+        std::vector<std::vector<std::string>> result =
+            StringUtils::Split("key1/key2=   /key3=value3", std::string("/"), 
std::string("="));
+        ASSERT_EQ(expect, result);
+    }
+    {
+        std::vector<std::vector<std::string>> expect = {{"key1", "value1"}, 
{"key3", "value3"}};
+        std::vector<std::vector<std::string>> result =
+            StringUtils::Split("key1=value1//key3=value3", std::string("/"), 
std::string("="));
+        ASSERT_EQ(expect, result);
+    }
+    {
+        std::vector<std::vector<std::string>> expect = {};
+        std::vector<std::vector<std::string>> result =
+            StringUtils::Split("", std::string("/"), std::string("="));
+        ASSERT_EQ(expect, result);
+    }
+}
+
+TEST_F(StringUtilsTest, TestStringToValueSimple) {
+    ASSERT_EQ(static_cast<int32_t>(233), 
StringUtils::StringToValue<int32_t>("233").value());
+    ASSERT_EQ(static_cast<int8_t>(10), 
StringUtils::StringToValue<int8_t>("10").value());
+    ASSERT_EQ(std::nullopt, StringUtils::StringToValue<int8_t>("1024"));
+    ASSERT_EQ(static_cast<int64_t>(34785895352),
+              StringUtils::StringToValue<int64_t>("34785895352").value());
+    ASSERT_EQ(std::nullopt, StringUtils::StringToValue<int32_t>("abc"));
+    ASSERT_EQ(std::nullopt, StringUtils::StringToValue<int32_t>(""));
+
+    ASSERT_EQ(true, StringUtils::StringToValue<bool>("1").value());
+    ASSERT_EQ(true, StringUtils::StringToValue<bool>("true").value());
+    ASSERT_EQ(true, StringUtils::StringToValue<bool>("TRUE").value());
+    ASSERT_EQ(false, StringUtils::StringToValue<bool>("0").value());
+    ASSERT_EQ(false, StringUtils::StringToValue<bool>("false").value());
+    ASSERT_EQ(false, StringUtils::StringToValue<bool>("FALSE").value());
+    ASSERT_EQ(std::nullopt, StringUtils::StringToValue<bool>("123"));
+}
+
+TEST_F(StringUtilsTest, TestStringToValueWithBoundaryValue) {
+    {
+        // normal case
+        CheckBoundary<int8_t>("127", "-128");
+        CheckBoundary<int16_t>("32767", "-32768");
+        CheckBoundary<int32_t>("2147483647", "-2147483648");
+        CheckBoundary<uint32_t>("4294967295", "0");
+        CheckBoundary<int64_t>("9223372036854775807", "-9223372036854775808");
+        CheckBoundary<uint64_t>("18446744073709551615", "0");
+        CheckBoundary<float>("3.4028235e+38", "-3.4028235e+38");
+        CheckBoundary<double>("1.7976931348623157e+308", 
"-1.7976931348623157e+308");
+    }
+    {
+        // overflow or underflow
+        CheckOverFlowAndUnderFlow<int8_t>("128", "-129");
+        CheckOverFlowAndUnderFlow<int16_t>("32768", "-32769");
+        CheckOverFlowAndUnderFlow<int32_t>("2147483648", "-2147483649");
+        CheckOverFlowAndUnderFlow<uint32_t>("4294967296", "-1");
+        CheckOverFlowAndUnderFlow<int64_t>("9223372036854775808", 
"-9223372036854775809");
+        CheckOverFlowAndUnderFlow<uint64_t>("18446744073709551616", "-1");
+
+        CheckOverFlowAndUnderFlow<float>("3.4028235e+39", "-3.4028235e+39");
+        CheckOverFlowAndUnderFlow<double>("1.7976931348623157e+309", 
"-1.7976931348623157e+309");
+    }
+}
+
+TEST_F(StringUtilsTest, TestStringToDate) {
+    {
+        ASSERT_OK_AND_ASSIGN(auto date, 
StringUtils::StringToDate("2147483647"));
+        ASSERT_EQ(date, 2147483647);
+    }
+    {
+        ASSERT_OK_AND_ASSIGN(auto date, 
StringUtils::StringToDate("-2147483648"));
+        ASSERT_EQ(date, -2147483648);
+    }
+    {
+        ASSERT_OK_AND_ASSIGN(auto date, 
StringUtils::StringToDate("1970-01-01"));
+        ASSERT_EQ(date, 0);
+    }
+    {
+        ASSERT_OK_AND_ASSIGN(auto date, 
StringUtils::StringToDate("0000-01-01"));
+        ASSERT_EQ(date, -719528);
+    }
+    {
+        ASSERT_OK_AND_ASSIGN(auto date, 
StringUtils::StringToDate("9999-12-31"));
+        ASSERT_EQ(date, 2932896);
+    }
+    // invalid str
+    ASSERT_NOK(StringUtils::StringToDate("9223372036854775807"));
+    ASSERT_NOK(StringUtils::StringToDate("11970-01-02"));
+    ASSERT_NOK(StringUtils::StringToDate("-1970-01-02"));
+    ASSERT_NOK(StringUtils::StringToDate(""));
+    ASSERT_NOK(StringUtils::StringToDate("1970-XX-02"));
+    ASSERT_NOK(StringUtils::StringToDate("2023-02-29"));
+    ASSERT_NOK(StringUtils::StringToDate("2024-00-01"));
+}
+
+TEST_F(StringUtilsTest, TestStringToTimestampMillis) {
+    TimezoneGuard tz_guard("Asia/Shanghai");
+    // "yyyy-MM-dd HH:mm:ss" format
+    {
+        ASSERT_OK_AND_ASSIGN(int64_t millis,
+                             StringUtils::StringToTimestampMillis("1970-01-01 
00:00:00"));
+        ASSERT_EQ(millis, -28800000);
+    }
+    // "yyyy-MM-dd HH:mm:ss.SSS" format
+    {
+        ASSERT_OK_AND_ASSIGN(int64_t millis1,
+                             StringUtils::StringToTimestampMillis("2023-06-01 
00:00:00.000"));
+        ASSERT_OK_AND_ASSIGN(int64_t millis2,
+                             StringUtils::StringToTimestampMillis("2023-06-01 
00:00:00.123"));
+        ASSERT_EQ(millis2 - millis1, 123);
+    }
+    // "yyyy-MM-dd" format (date only, time defaults to 00:00:00)
+    {
+        ASSERT_OK_AND_ASSIGN(int64_t millis1, 
StringUtils::StringToTimestampMillis("2023-06-01"));
+        ASSERT_OK_AND_ASSIGN(int64_t millis2,
+                             StringUtils::StringToTimestampMillis("2023-06-01 
00:00:00"));
+        ASSERT_EQ(millis1, millis2);
+    }
+    // Fractional second padding: "1" -> 100ms, "12" -> 120ms
+    {
+        ASSERT_OK_AND_ASSIGN(int64_t millis_base,
+                             StringUtils::StringToTimestampMillis("2023-06-01 
12:00:00.000"));
+        ASSERT_OK_AND_ASSIGN(int64_t millis_1,
+                             StringUtils::StringToTimestampMillis("2023-06-01 
12:00:00.1"));
+        ASSERT_EQ(millis_1 - millis_base, 100);
+        ASSERT_OK_AND_ASSIGN(int64_t millis_12,
+                             StringUtils::StringToTimestampMillis("2023-06-01 
12:00:00.12"));
+        ASSERT_EQ(millis_12 - millis_base, 120);
+    }
+    // Invalid strings
+    ASSERT_NOK(StringUtils::StringToTimestampMillis(""));
+    ASSERT_NOK(StringUtils::StringToTimestampMillis("not-a-date"));
+    ASSERT_NOK(StringUtils::StringToTimestampMillis("2023-XX-01 00:00:00"));
+    // Trailing garbage
+    ASSERT_NOK(StringUtils::StringToTimestampMillis("2023-06-01 00:00:00abc"));
+    ASSERT_NOK(StringUtils::StringToTimestampMillis("2023-06-01 
00:00:00.12xyz"));
+    ASSERT_NOK(StringUtils::StringToTimestampMillis("2023-06-01 00:00:00 "));
+    ASSERT_NOK(StringUtils::StringToTimestampMillis("2023-06-01 00:00:00.12 
"));
+    // Trailing dot with no digits
+    ASSERT_NOK(StringUtils::StringToTimestampMillis("2023-06-01 00:00:00."));
+    ASSERT_NOK(StringUtils::StringToTimestampMillis("2023-02-29 12:00:00"));
+    ASSERT_NOK(StringUtils::StringToTimestampMillis("2024-00-15 00:00:00"));
+}
+
+TEST_F(StringUtilsTest, TestVectorToString) {
+    class A {
+     public:
+        explicit A(int32_t value) : value_(value) {}
+        std::string ToString() const {
+            return std::to_string(value_);
+        }
+
+     private:
+        int32_t value_;
+    };
+
+    {
+        std::vector<A> vec = {A(10), A(20), A(30)};
+        ASSERT_EQ(StringUtils::VectorToString(vec), "[10, 20, 30]");
+    }
+    {
+        std::vector<std::optional<A>> vec = {A(10), A(20), A(30), 
std::nullopt};
+        ASSERT_EQ(StringUtils::VectorToString(vec), "[10, 20, 30, null]");
+    }
+    {
+        std::vector<std::shared_ptr<A>> vec = {std::make_shared<A>(10), 
std::make_shared<A>(20),
+                                               std::make_shared<A>(30)};
+        ASSERT_EQ(StringUtils::VectorToString(vec), "[10, 20, 30]");
+    }
+}
+}  // namespace paimon::test
diff --git a/src/paimon/testing/utils/timezone_guard.h 
b/src/paimon/testing/utils/timezone_guard.h
new file mode 100644
index 0000000..2b88532
--- /dev/null
+++ b/src/paimon/testing/utils/timezone_guard.h
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdlib>
+#include <ctime>
+#include <string>
+
+namespace paimon::test {
+class TimezoneGuard {
+ public:
+    explicit TimezoneGuard(const std::string& tz) {
+        const char* original = std::getenv("TZ");
+        if (original) {
+            original_tz_ = original;
+        } else {
+            original_tz_.clear();
+        }
+
+        setenv("TZ", tz.c_str(), /*replace=*/1);
+        tzset();
+    }
+
+    ~TimezoneGuard() {
+        if (original_tz_.empty()) {
+            unsetenv("TZ");
+        } else {
+            setenv("TZ", original_tz_.c_str(), /*replace=*/1);
+        }
+        tzset();
+    }
+
+ private:
+    std::string original_tz_;
+};
+
+}  // namespace paimon::test

Reply via email to