This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new f9008f2  feat: add GenericRow, RecordBatch, MemorySize and 
TimeDuration (#28)
f9008f2 is described below

commit f9008f2111359df84f82df33f54dc888ab0e4f31
Author: lszskye <[email protected]>
AuthorDate: Fri May 29 11:13:00 2026 +0800

    feat: add GenericRow, RecordBatch, MemorySize and TimeDuration (#28)
    
    Squash merge PR #28.
---
 include/paimon/record_batch.h                    | 159 +++++++++++++++
 src/paimon/common/data/generic_row.h             | 237 +++++++++++++++++++++++
 src/paimon/common/data/generic_row_test.cpp      | 110 +++++++++++
 src/paimon/common/data/record_batch.cpp          | 163 ++++++++++++++++
 src/paimon/common/data/record_batch_test.cpp     | 122 ++++++++++++
 src/paimon/common/options/memory_size.cpp        | 130 +++++++++++++
 src/paimon/common/options/memory_size.h          |  82 ++++++++
 src/paimon/common/options/memory_size_test.cpp   | 136 +++++++++++++
 src/paimon/common/options/time_duration.cpp      | 162 ++++++++++++++++
 src/paimon/common/options/time_duration.h        |  92 +++++++++
 src/paimon/common/options/time_duration_test.cpp |  87 +++++++++
 11 files changed, 1480 insertions(+)

diff --git a/include/paimon/record_batch.h b/include/paimon/record_batch.h
new file mode 100644
index 0000000..9ea8a4b
--- /dev/null
+++ b/include/paimon/record_batch.h
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "paimon/result.h"
+#include "paimon/visibility.h"
+
+struct ArrowArray;
+
+namespace paimon {
+/// `RecordBatch` encapsulates a batch of data with the same schema, 
supporting different types such
+/// as `INSERT`, `UPDATE_BEFORE`, `UPDATE_AFTER`, and `DELETE`. It is 
typically used in streaming
+/// write or batch processing scenarios, with underlying data stored in the 
Apache Arrow format.
+/// @note Do not use this class directly, use `RecordBatchBuilder` to build a 
`RecordBatch` which
+/// has input validation.
+class PAIMON_EXPORT RecordBatch {
+ public:
+    enum class PAIMON_EXPORT RowKind : int8_t {
+        INSERT = 0,
+        UPDATE_BEFORE = 1,
+        UPDATE_AFTER = 2,
+        DELETE = 3,
+    };
+
+    /// @note 1. Data cannot be reused, as it will be released after Write. 2. 
If a partition
+    /// field's value is null, it should be represented as 
"__DEFAULT_PARTITION__"(or a user-defined
+    /// default value) in the partition map. However, in the Arrow array, 
partition column values
+    /// MUST NOT be set to "__DEFAULT_PARTITION__" (or a user-defined default 
value). Instead, they
+    /// should be properly set as actual nulls. If used, it may lead to 
behavioral inconsistencies
+    /// between C++ Paimon and Java Paimon.
+    RecordBatch(const std::map<std::string, std::string>& partition, int32_t 
bucket,
+                const std::vector<RowKind>& row_kinds, ArrowArray* data);
+    ~RecordBatch();
+
+    RecordBatch(RecordBatch&&);
+    RecordBatch& operator=(RecordBatch&&);
+
+    RecordBatch(const RecordBatch&) = delete;
+    RecordBatch& operator=(const RecordBatch&) = delete;
+
+    const std::map<std::string, std::string>& GetPartition() const {
+        return partition_;
+    }
+
+    int32_t GetBucket() const {
+        return bucket_;
+    }
+
+    ArrowArray* GetData() const {
+        return data_;
+    }
+
+    const std::vector<RecordBatch::RowKind>& GetRowKind() const {
+        return row_kinds_;
+    }
+
+    void SetBucket(int32_t bucket) {
+        bucket_ = bucket;
+    }
+
+    bool HasSpecifiedBucket() const;
+
+ private:
+    std::map<std::string, std::string> partition_;
+    int32_t bucket_;
+    std::vector<RecordBatch::RowKind> row_kinds_;
+    ::ArrowArray* data_;
+};
+
+/// Builder for constructing `RecordBatch` instances.
+///
+/// This class provides a convenient way to build `RecordBatch` objects by 
setting
+/// various properties such as data, row kinds, partition information, and 
bucket id.
+class PAIMON_EXPORT RecordBatchBuilder {
+ public:
+    /// Constructs a `RecordBatchBuilder` with Arrow data
+    ///
+    /// @note The `data` must conform to table schema:
+    ///       - Each array in `data` corresponds to a field in table schema.
+    ///       - If a field in table schema is marked as non-nullable 
(`nullable = false`),
+    ///         the corresponding array in `data` must have zero null entries.
+    ///
+    /// @note Consistency between `data` and table schema will be validated 
during the write
+    /// process.
+    ///
+    /// @param data ArrowArray struct containing the columnar data (via C Data 
Interface)
+    explicit RecordBatchBuilder(::ArrowArray* data);
+
+    ~RecordBatchBuilder();
+
+    /// Move new Arrow data into the builder, replacing existing data.
+    /// @param data New Arrow array data.
+    RecordBatchBuilder& MoveData(::ArrowArray* data);
+
+    /// Set the row kinds for each record in the batch.
+    /// @param row_kinds A vector of row kinds, including INSERT, 
UPDATE_BEFORE, UPDATE_AFTER and
+    /// DELETE. If not set, default value is `INSERT`.
+    /// @note `row_kinds` must have the same length as the number of records 
in the data.
+    RecordBatchBuilder& SetRowKinds(const std::vector<RecordBatch::RowKind>& 
row_kinds);
+
+    /// Set the partition information for this record batch.
+    /// @param data Map of partition column names to their string values.
+    RecordBatchBuilder& SetPartition(const std::map<std::string, std::string>& 
data);
+
+    /// Set the bucket id for this record batch. If not set, default value is 
`-2147483648`
+    /// (i.e., `HasSpecifiedBucket()` returns false), and the bucket will be 
auto-resolved
+    /// at write time based on the table's bucket option:
+    ///
+    /// - **Unaware-bucket mode** (table option `bucket = -1`, append-only 
table without
+    ///   primary keys): the bucket will be auto-filled with `UNAWARE_BUCKET 
(0)`. If the
+    ///   caller does specify a bucket, it MUST be `UNAWARE_BUCKET (0)`, 
otherwise the
+    ///   write fails.
+    /// - **Postpone-bucket mode** (table option `bucket = -2`, primary-key 
table whose
+    ///   bucket assignment is deferred): the bucket will be auto-filled with
+    ///   `POSTPONE_BUCKET (-2)`. If the caller does specify a bucket, it MUST 
be
+    ///   `POSTPONE_BUCKET (-2)`, otherwise the write fails.
+    /// - **Fixed-bucket mode** (table option `bucket > 0`): the caller MUST 
explicitly
+    ///   call `SetBucket()` with a value in `[0, bucket)`; not calling 
`SetBucket()`
+    ///   (or passing an out-of-range value) will cause the write to fail.
+    ///
+    /// @param bucket The bucket id for data distribution.
+    RecordBatchBuilder& SetBucket(int32_t bucket);
+
+    /// Build and return the final `RecordBatch` instance.
+    ///
+    /// This method validates the configuration and creates `RecordBatch` with 
all
+    /// the specified properties.
+    Result<std::unique_ptr<RecordBatch>> Finish();
+
+    class Impl;
+
+ private:
+    std::unique_ptr<Impl> impl_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/data/generic_row.h 
b/src/paimon/common/data/generic_row.h
new file mode 100644
index 0000000..7e5da05
--- /dev/null
+++ b/src/paimon/common/data/generic_row.h
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <algorithm>
+#include <cassert>
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <string_view>
+#include <utility>
+#include <variant>
+#include <vector>
+
+#include "paimon/common/data/binary_string.h"
+#include "paimon/common/data/data_define.h"
+#include "paimon/common/data/internal_row.h"
+#include "paimon/common/types/row_kind.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/result.h"
+
+namespace paimon {
+class Bytes;
+class InternalArray;
+class InternalMap;
+/// An internal data structure representing data of row.
+
+/// GenericRow is a generic implementation of `InternalRow` which is backed by 
an array of
+/// VariantType. A GenericRow can have an arbitrary number of fields of 
different types. The fields
+/// in a row can be accessed by position (0-based) using either the generic 
`GetField()` or
+/// type-specific getters (such as `GetInt()`). A field can be updated by the 
generic `SetField()`.
+
+/// @note All fields of this data structure must be internal data structures. 
See `InternalRow`
+/// for more information about internal data structures. The fields in 
GenericRow can be null for
+/// representing nullability. Noted that GenericRow does not support nested 
type (i.e., Map, Array,
+/// Struct).
+class GenericRow : public InternalRow {
+ public:
+    /// Creates an instance of GenericRow with given number of fields.
+    /// Initially, all fields are set to null. By default, the row describes a
+    /// `RowKind::Insert()` in a changelog.
+    /// @note All fields of the row must be internal data structures.
+    /// @param arity number of fields
+    explicit GenericRow(int32_t arity) : GenericRow(RowKind::Insert(), arity) 
{}
+
+    /// Creates an instance of GenericRow with given kind and number of fields.
+    /// Initially, all fields are set to null.
+    /// @note All fields of the row must be internal data structures.
+    /// @param kind kind of change that this row describes in a changelog
+    /// @param arity number of fields
+    GenericRow(const RowKind* kind, int32_t arity) : kind_(kind) {
+        fields_.resize(arity);
+    }
+
+    /// Sets the field value at the given position.
+    /// @note The given field value must be an internal data structures. 
Otherwise the
+    /// GenericRow is corrupted. See `InternalRow` for more information about 
internal data
+    /// structures. The field value can be null for representing nullability.
+    void SetField(int32_t pos, const VariantType& value) {
+        assert(static_cast<size_t>(pos) < fields_.size());
+        fields_[pos] = value;
+    }
+
+    /// @return the field value at the given position.
+    /// @note The returned value is in internal data structure. See 
`InternalRow`
+    /// for more information about internal data structures.
+    /// The returned field value can be null for representing nullability.
+    const VariantType& GetField(int32_t pos) const {
+        assert(static_cast<size_t>(pos) < fields_.size());
+        return fields_[pos];
+    }
+
+    int32_t GetFieldCount() const override {
+        return fields_.size();
+    }
+
+    Result<const RowKind*> GetRowKind() const override {
+        return kind_;
+    }
+
+    void SetRowKind(const RowKind* kind) override {
+        kind_ = kind;
+    }
+
+    bool IsNullAt(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < fields_.size());
+        return DataDefine::IsVariantNull(fields_[pos]);
+    }
+
+    void AddDataHolder(std::unique_ptr<InternalRow>&& holder) {
+        row_holder_.push_back(std::move(holder));
+    }
+
+    void AddDataHolder(const std::shared_ptr<Bytes>& bytes) {
+        bytes_holder_ = bytes;
+    }
+
+    bool GetBoolean(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < fields_.size());
+        return DataDefine::GetVariantValue<bool>(fields_[pos]);
+    }
+
+    char GetByte(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < fields_.size());
+        return DataDefine::GetVariantValue<char>(fields_[pos]);
+    }
+
+    int16_t GetShort(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < fields_.size());
+        return DataDefine::GetVariantValue<int16_t>(fields_[pos]);
+    }
+
+    int32_t GetInt(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < fields_.size());
+        return DataDefine::GetVariantValue<int32_t>(fields_[pos]);
+    }
+
+    int32_t GetDate(int32_t pos) const override {
+        return GetInt(pos);
+    }
+
+    int64_t GetLong(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < fields_.size());
+        return DataDefine::GetVariantValue<int64_t>(fields_[pos]);
+    }
+
+    float GetFloat(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < fields_.size());
+        return DataDefine::GetVariantValue<float>(fields_[pos]);
+    }
+
+    double GetDouble(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < fields_.size());
+        return DataDefine::GetVariantValue<double>(fields_[pos]);
+    }
+
+    BinaryString GetString(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < fields_.size());
+        return DataDefine::GetVariantValue<BinaryString>(fields_[pos]);
+    }
+
+    std::string_view GetStringView(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < fields_.size());
+        return DataDefine::GetStringView(fields_[pos]);
+    }
+
+    Decimal GetDecimal(int32_t pos, int32_t precision, int32_t scale) const 
override {
+        assert(static_cast<size_t>(pos) < fields_.size());
+        return DataDefine::GetVariantValue<Decimal>(fields_[pos]);
+    }
+
+    Timestamp GetTimestamp(int32_t pos, int32_t precision) const override {
+        assert(static_cast<size_t>(pos) < fields_.size());
+        return DataDefine::GetVariantValue<Timestamp>(fields_[pos]);
+    }
+
+    std::shared_ptr<Bytes> GetBinary(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < fields_.size());
+        return 
DataDefine::GetVariantValue<std::shared_ptr<Bytes>>(fields_[pos]);
+    }
+
+    std::shared_ptr<InternalArray> GetArray(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < fields_.size());
+        return 
DataDefine::GetVariantValue<std::shared_ptr<InternalArray>>(fields_[pos]);
+    }
+
+    std::shared_ptr<InternalMap> GetMap(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < fields_.size());
+        return 
DataDefine::GetVariantValue<std::shared_ptr<InternalMap>>(fields_[pos]);
+    }
+
+    std::shared_ptr<InternalRow> GetRow(int32_t pos, int32_t num_fields) const 
override {
+        assert(static_cast<size_t>(pos) < fields_.size());
+        return 
DataDefine::GetVariantValue<std::shared_ptr<InternalRow>>(fields_[pos]);
+    }
+
+    std::string ToString() const override {
+        std::string ret = "(";
+        for (size_t i = 0; i < fields_.size(); i++) {
+            if (i != 0) {
+                ret.append(",");
+            }
+            ret.append(DataDefine::VariantValueToString(fields_[i]));
+        }
+        ret.append(")");
+        return ret;
+    }
+
+    bool operator==(const GenericRow& other) const {
+        if (this == &other) {
+            return true;
+        }
+        bool kind_equal =
+            (kind_ == other.kind_) || (kind_ && other.kind_ && *kind_ == 
*other.kind_);
+        return kind_equal && fields_ == other.fields_;
+    }
+
+    /// Creates an instance of GenericRow with given field values.
+    static std::unique_ptr<GenericRow> Of(const std::vector<VariantType>& 
values) {
+        auto row = std::make_unique<GenericRow>(values.size());
+        for (size_t i = 0; i < values.size(); i++) {
+            row->SetField(i, values[i]);
+        }
+        return row;
+    }
+
+ private:
+    /// The array to store the actual internal format values.
+    std::vector<VariantType> fields_;
+    /// As GenericRow only holds string view for string data to avoid deep 
copy, original data must
+    /// be held in row holders_ or bytes holder
+    std::vector<std::unique_ptr<InternalRow>> row_holder_;
+    std::shared_ptr<Bytes> bytes_holder_;
+    /// The kind of change that a row describes in a changelog.
+    const RowKind* kind_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/data/generic_row_test.cpp 
b/src/paimon/common/data/generic_row_test.cpp
new file mode 100644
index 0000000..3d6abde
--- /dev/null
+++ b/src/paimon/common/data/generic_row_test.cpp
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/data/generic_row.h"
+
+#include "arrow/api.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/binary_array.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/data/columnar/columnar_map.h"
+#include "paimon/common/data/internal_array.h"
+#include "paimon/common/data/internal_map.h"
+#include "paimon/common/utils/decimal_utils.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/binary_row_generator.h"
+
+namespace paimon::test {
+
+TEST(GenericRowTest, TestSimple) {
+    auto pool = GetDefaultPool();
+    GenericRow row(16);
+    row.SetField(0, true);
+    row.SetField(1, static_cast<char>(1));
+    row.SetField(2, static_cast<int16_t>(2));
+    row.SetField(3, static_cast<int32_t>(3));
+    row.SetField(4, static_cast<int64_t>(4));
+    row.SetField(5, static_cast<float>(5.1));
+    row.SetField(6, 6.12);
+    auto str = BinaryString::FromString("abcd", pool.get());
+    row.SetField(7, str);
+    std::shared_ptr<Bytes> bytes = Bytes::AllocateBytes("efgh", pool.get());
+    row.SetField(8, bytes);
+    std::string str9 = "apple";
+    row.SetField(9, std::string_view(str9.data(), str9.size()));
+
+    Timestamp ts(100, 20);
+    row.SetField(10, ts);
+    Decimal decimal(/*precision=*/30, /*scale=*/20,
+                    
DecimalUtils::StrToInt128("12345678998765432145678").value());
+    row.SetField(11, decimal);
+
+    auto array = std::make_shared<BinaryArray>(BinaryArray::FromLongArray(
+        {static_cast<int64_t>(10), static_cast<int64_t>(20)}, pool.get()));
+    row.SetField(12, array);
+
+    std::shared_ptr<InternalRow> binary_row =
+        BinaryRowGenerator::GenerateRowPtr({100, 200}, pool.get());
+    row.SetField(13, binary_row);
+
+    auto key = arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), "[1, 
2, 3]").ValueOrDie();
+    auto value =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::int64(), "[2, 4, 
6]").ValueOrDie();
+    auto map = std::make_shared<ColumnarMap>(key, value, pool, /*offset=*/0, 
/*length=*/3);
+    row.SetField(14, map);
+    // do not set value at pos 15, therefore, pos 15 is null
+
+    ASSERT_EQ(row.GetFieldCount(), 16);
+    ASSERT_EQ(row.GetRowKind().value(), RowKind::Insert());
+    row.SetRowKind(RowKind::Delete());
+    ASSERT_EQ(row.GetRowKind().value(), RowKind::Delete());
+
+    ASSERT_TRUE(row == row);
+    // test get
+    ASSERT_FALSE(row.IsNullAt(0));
+    ASSERT_EQ(row.GetBoolean(0), true);
+    ASSERT_EQ(row.GetByte(1), static_cast<char>(1));
+    ASSERT_EQ(row.GetShort(2), static_cast<int16_t>(2));
+    ASSERT_EQ(row.GetInt(3), static_cast<int32_t>(3));
+    ASSERT_EQ(row.GetDate(3), static_cast<int32_t>(3));
+    ASSERT_EQ(row.GetLong(4), static_cast<int64_t>(4));
+    ASSERT_EQ(row.GetFloat(5), static_cast<float>(5.1));
+    ASSERT_EQ(row.GetDouble(6), static_cast<double>(6.12));
+    ASSERT_EQ(row.GetString(7), str);
+    ASSERT_EQ(*row.GetBinary(8), *bytes);
+    ASSERT_EQ(std::string(row.GetStringView(9)), str9);
+    ASSERT_EQ(row.GetTimestamp(10, /*precision=*/9), ts);
+    ASSERT_EQ(row.GetDecimal(11, /*precision=*/30, /*scale=*/20), decimal);
+    ASSERT_EQ(row.GetArray(12)->ToLongArray().value(), 
array->ToLongArray().value());
+    auto binary_row_result = 
std::dynamic_pointer_cast<BinaryRow>(row.GetRow(13, 2));
+    auto binary_row_expected = 
std::dynamic_pointer_cast<BinaryRow>(binary_row);
+    ASSERT_EQ(*binary_row_result, *binary_row_expected);
+    ASSERT_EQ(row.GetMap(14)->KeyArray()->ToIntArray().value(),
+              map->KeyArray()->ToIntArray().value());
+    ASSERT_EQ(row.GetMap(14)->ValueArray()->ToLongArray().value(),
+              map->ValueArray()->ToLongArray().value());
+    ASSERT_TRUE(row.IsNullAt(15));
+    ASSERT_EQ(
+        "(true,1,2,3,4,5.100000,6.120000,abcd,efgh,apple,1970-01-01 "
+        "00:00:00.100000020,123.45678998765432145678,array,row,map,null)",
+        row.ToString());
+}
+}  // namespace paimon::test
diff --git a/src/paimon/common/data/record_batch.cpp 
b/src/paimon/common/data/record_batch.cpp
new file mode 100644
index 0000000..1f1df68
--- /dev/null
+++ b/src/paimon/common/data/record_batch.cpp
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/record_batch.h"
+
+#include <cassert>
+#include <cstddef>
+#include <limits>
+#include <utility>
+
+#include "arrow/c/abi.h"
+#include "arrow/c/helpers.h"
+#include "fmt/format.h"
+#include "paimon/common/utils/scope_guard.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+class RecordBatchBuilder::Impl {
+ public:
+    friend class RecordBatchBuilder;
+    static constexpr int32_t INVALID_BUCKET = 
std::numeric_limits<int32_t>::min();
+
+    void MoveData(::ArrowArray* data) {
+        assert(data);
+        ReleaseData();
+        data_ = new ::ArrowArray();
+        ArrowArrayMove(data, data_);
+    }
+
+    void Reset() {
+        partition_.clear();
+        bucket_ = INVALID_BUCKET;
+        row_kinds_.clear();
+        ReleaseData();
+    }
+
+ private:
+    void ReleaseData() {
+        if (data_) {
+            ArrowArrayRelease(data_);
+            delete data_;
+            data_ = nullptr;
+        }
+    }
+
+    std::map<std::string, std::string> partition_;
+    int32_t bucket_ = INVALID_BUCKET;
+    std::vector<RecordBatch::RowKind> row_kinds_;
+    ::ArrowArray* data_ = nullptr;
+};
+
+RecordBatchBuilder::RecordBatchBuilder(::ArrowArray* data) : 
impl_(std::make_unique<Impl>()) {
+    impl_->MoveData(data);
+}
+
+RecordBatchBuilder::~RecordBatchBuilder() {
+    if (impl_) {
+        impl_->Reset();
+    }
+}
+
+RecordBatchBuilder& RecordBatchBuilder::MoveData(::ArrowArray* data) {
+    impl_->MoveData(data);
+    return *this;
+}
+
+RecordBatchBuilder& RecordBatchBuilder::SetRowKinds(
+    const std::vector<RecordBatch::RowKind>& row_kinds) {
+    impl_->row_kinds_ = row_kinds;
+    return *this;
+}
+
+RecordBatchBuilder& RecordBatchBuilder::SetPartition(
+    const std::map<std::string, std::string>& partition) {
+    impl_->partition_ = partition;
+    return *this;
+}
+
+RecordBatchBuilder& RecordBatchBuilder::SetBucket(int32_t bucket) {
+    impl_->bucket_ = bucket;
+    return *this;
+}
+
+Result<std::unique_ptr<RecordBatch>> RecordBatchBuilder::Finish() {
+    ScopeGuard guard([this]() { impl_->Reset(); });
+    if (impl_->data_ == nullptr) {
+        return Status::Invalid("data is null pointer");
+    }
+    if (ArrowArrayIsReleased(impl_->data_)) {
+        return Status::Invalid("data is released");
+    }
+    if (!impl_->row_kinds_.empty() &&
+        impl_->row_kinds_.size() != static_cast<size_t>(impl_->data_->length)) 
{
+        return Status::Invalid(fmt::format("data size {} does not match with 
row_kinds size {}",
+                                           impl_->data_->length, 
impl_->row_kinds_.size()));
+    }
+    return std::make_unique<RecordBatch>(impl_->partition_, impl_->bucket_, 
impl_->row_kinds_,
+                                         impl_->data_);
+}
+
+RecordBatch::RecordBatch(const std::map<std::string, std::string>& partition, 
int32_t bucket,
+                         const std::vector<RowKind>& row_kinds, ArrowArray* 
data)
+    : partition_(partition), bucket_(bucket), row_kinds_(row_kinds) {
+    data_ = new ArrowArray();
+    ArrowArrayMove(data, data_);
+}
+
+RecordBatch::~RecordBatch() {
+    if (data_) {
+        ArrowArrayRelease(data_);
+        delete data_;
+    }
+}
+
+RecordBatch::RecordBatch(RecordBatch&& other) {
+    if (this != &other) {
+        partition_ = std::move(other.partition_);
+        bucket_ = other.bucket_;
+        row_kinds_ = std::move(other.row_kinds_);
+        data_ = other.data_;
+        other.data_ = nullptr;
+    }
+}
+
+RecordBatch& RecordBatch::operator=(RecordBatch&& other) {
+    if (this == &other) {
+        return *this;
+    }
+    partition_ = std::move(other.partition_);
+    bucket_ = other.bucket_;
+    row_kinds_ = std::move(other.row_kinds_);
+    if (data_) {
+        ArrowArrayRelease(data_);
+        delete data_;
+    }
+    data_ = other.data_;
+    other.data_ = nullptr;
+    return *this;
+}
+
+bool RecordBatch::HasSpecifiedBucket() const {
+    return bucket_ != RecordBatchBuilder::Impl::INVALID_BUCKET;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/data/record_batch_test.cpp 
b/src/paimon/common/data/record_batch_test.cpp
new file mode 100644
index 0000000..4b50322
--- /dev/null
+++ b/src/paimon/common/data/record_batch_test.cpp
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/record_batch.h"
+
+#include <utility>
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_nested.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/ipc/json_simple.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "gtest/gtest.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(RecordBatchTest, TestSimple) {
+    // prepare an arrow array with 
struct<col1:string,col2:int32,col3:int64,col4:bool>
+    auto string_field = arrow::field("col1", arrow::utf8());
+    auto int_field = arrow::field("col2", arrow::int32());
+    auto long_field = arrow::field("col3", arrow::int64());
+    auto bool_field = arrow::field("col4", arrow::boolean());
+
+    auto struct_type = arrow::struct_({string_field, int_field, long_field, 
bool_field});
+    auto schema =
+        arrow::schema(arrow::FieldVector({string_field, int_field, long_field, 
bool_field}));
+
+    arrow::StructBuilder struct_builder(
+        struct_type, arrow::default_memory_pool(),
+        {std::make_shared<arrow::StringBuilder>(), 
std::make_shared<arrow::Int32Builder>(),
+         std::make_shared<arrow::Int64Builder>(), 
std::make_shared<arrow::BooleanBuilder>()});
+    auto string_builder = 
static_cast<arrow::StringBuilder*>(struct_builder.field_builder(0));
+    auto int_builder = 
static_cast<arrow::Int32Builder*>(struct_builder.field_builder(1));
+    auto long_builder = 
static_cast<arrow::Int64Builder*>(struct_builder.field_builder(2));
+    auto bool_builder = 
static_cast<arrow::BooleanBuilder*>(struct_builder.field_builder(3));
+    for (int32_t i = 0; i < 10; ++i) {
+        ASSERT_TRUE(struct_builder.Append().ok());
+        ASSERT_TRUE(string_builder->Append("20240813").ok());
+        ASSERT_TRUE(int_builder->Append(23).ok());
+        ASSERT_TRUE(long_builder->Append(static_cast<int64_t>(1722848484308ll 
+ i)).ok());
+        ASSERT_TRUE(bool_builder->Append(static_cast<bool>(i % 2)).ok());
+    }
+    std::shared_ptr<arrow::Array> array;
+    ASSERT_TRUE(struct_builder.Finish(&array).ok());
+    ::ArrowArray arrow_array;
+    ASSERT_TRUE(arrow::ExportArray(*array, &arrow_array).ok());
+    RecordBatchBuilder batch_builder(&arrow_array);
+    std::map<std::string, std::string> partition = {{"col1", "20240813"}, 
{"col2", "23"}};
+    ASSERT_NOK(batch_builder.SetPartition(partition)
+                   .SetRowKinds({RecordBatch::RowKind::INSERT, 
RecordBatch::RowKind::INSERT})
+                   .Finish());
+    ::ArrowArray arrow_array2;
+    ASSERT_TRUE(arrow::ExportArray(*array, &arrow_array2).ok());
+    ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch2,
+                         
batch_builder.MoveData(&arrow_array2).SetPartition(partition).Finish());
+
+    RecordBatch batch3 = std::move(*batch2);
+    ASSERT_EQ(batch3.GetPartition(), partition);
+
+    RecordBatch batch4(std::move(batch3));
+    ASSERT_EQ(batch4.GetPartition(), partition);
+}
+
+TEST(RecordBatchTest, TestAssignAndMove) {
+    arrow::FieldVector fields = {arrow::field("f0", arrow::boolean()),
+                                 arrow::field("f1", arrow::int8())};
+    std::map<std::string, std::string> partition = {{"f1", "1"}};
+    auto old_array = std::dynamic_pointer_cast<arrow::StructArray>(
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([
+        [true, 1]
+    ])")
+            .ValueOrDie());
+    ::ArrowArray old_arrow_array;
+    ASSERT_TRUE(arrow::ExportArray(*old_array, &old_arrow_array).ok());
+    RecordBatch old_batch(partition, /*bucket=*/0, 
{RecordBatch::RowKind::INSERT},
+                          &old_arrow_array);
+
+    auto new_array = std::dynamic_pointer_cast<arrow::StructArray>(
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([
+        [false, 1]
+    ])")
+            .ValueOrDie());
+    ::ArrowArray new_arrow_array;
+    ASSERT_TRUE(arrow::ExportArray(*new_array, &new_arrow_array).ok());
+    RecordBatch new_batch(partition, /*bucket=*/1, 
{RecordBatch::RowKind::INSERT},
+                          &new_arrow_array);
+
+    old_batch = std::move(new_batch);
+    ASSERT_EQ(old_batch.GetBucket(), 1);
+    ASSERT_FALSE(
+        new_batch.GetData());  // NOLINT(bugprone-use-after-move, 
clang-analyzer-cplusplus.Move)
+    new_batch = std::move(old_batch);
+    ASSERT_EQ(new_batch.GetBucket(), 1);
+    ASSERT_FALSE(
+        old_batch.GetData());  // NOLINT(bugprone-use-after-move, 
clang-analyzer-cplusplus.Move)
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/options/memory_size.cpp 
b/src/paimon/common/options/memory_size.cpp
new file mode 100644
index 0000000..b89d170
--- /dev/null
+++ b/src/paimon/common/options/memory_size.cpp
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/options/memory_size.h"
+
+#include <cstddef>
+#include <limits>
+#include <optional>
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+const MemorySize::MemoryUnit& MemorySize::Bytes() {
+    static const MemoryUnit kBytes{{"b", "bytes"}, 1L};
+    return kBytes;
+}
+
+const MemorySize::MemoryUnit& MemorySize::KiloBytes() {
+    static const MemoryUnit kKiloBytes{{"k", "kb", "kibibytes"}, 1024L};
+    return kKiloBytes;
+}
+
+const MemorySize::MemoryUnit& MemorySize::MegaBytes() {
+    static const MemoryUnit kMegaBytes{{"m", "mb", "mebibytes"}, 1024L * 
1024L};
+    return kMegaBytes;
+}
+
+const MemorySize::MemoryUnit& MemorySize::GigaBytes() {
+    static const MemoryUnit kGigaBytes{{"g", "gb", "gibibytes"}, 1024L * 1024L 
* 1024L};
+    return kGigaBytes;
+}
+
+const MemorySize::MemoryUnit& MemorySize::TeraBytes() {
+    static const MemoryUnit kTeraBytes{{"t", "tb", "tebibytes"}, 1024L * 1024L 
* 1024L * 1024L};
+    return kTeraBytes;
+}
+
+Result<int64_t> MemorySize::ParseBytes(const std::string& text) {
+    std::string trimmed = text;
+    StringUtils::Trim(&trimmed);
+    if (trimmed.empty()) {
+        return Status::Invalid("argument is an empty or whitespace-only 
string");
+    }
+    const size_t len = trimmed.length();
+    size_t pos = 0;
+
+    char current;
+    while (pos < len) {
+        current = trimmed[pos];
+        if (current >= '0' && current <= '9') {
+            pos++;
+        } else {
+            break;
+        }
+    }
+    std::string number = trimmed.substr(0, pos);
+    if (number.empty()) {
+        return Status::Invalid("text does not start with a number");
+    }
+    std::string raw_unit = trimmed.substr(pos);
+    StringUtils::Trim(&raw_unit);
+    std::string unit = StringUtils::ToLowerCase(raw_unit);
+
+    std::optional<int64_t> value = StringUtils::StringToValue<int64_t>(number);
+    if (value == std::nullopt) {
+        return Status::Invalid(fmt::format(
+            "The value '{}' cannot be represented as 64bit number (numeric 
overflow).", number));
+    }
+    PAIMON_ASSIGN_OR_RAISE(MemoryUnit parsed_unit, ParseUnit(unit));
+    int64_t multiplier = parsed_unit.GetMultiplier();
+
+    int64_t maximum = std::numeric_limits<int64_t>::max() / multiplier;
+    // check for overflow
+    if (value.value() > maximum) {
+        return Status::Invalid(fmt::format(
+            "The value '{}' cannot be represented as 64bit number of bytes 
(numeric overflow).",
+            text));
+    }
+    return value.value() * multiplier;
+}
+
+Result<MemorySize::MemoryUnit> MemorySize::ParseUnit(const std::string& unit) {
+    if (MatchesAny(unit, Bytes())) {
+        return Bytes();
+    } else if (MatchesAny(unit, KiloBytes())) {
+        return KiloBytes();
+    } else if (MatchesAny(unit, MegaBytes())) {
+        return MegaBytes();
+    } else if (MatchesAny(unit, GigaBytes())) {
+        return GigaBytes();
+    } else if (MatchesAny(unit, TeraBytes())) {
+        return TeraBytes();
+    } else if (!unit.empty()) {
+        return Status::Invalid(
+            fmt::format("Memory size unit '{}' does not match any of the 
recognized units", unit));
+    }
+    return Bytes();
+}
+
+bool MemorySize::MatchesAny(const std::string& str, const 
MemorySize::MemoryUnit& unit) {
+    for (const auto& u : unit.GetUnits()) {
+        if (u == str) {
+            return true;
+        }
+    }
+    return false;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/options/memory_size.h 
b/src/paimon/common/options/memory_size.h
new file mode 100644
index 0000000..1908c31
--- /dev/null
+++ b/src/paimon/common/options/memory_size.h
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include "paimon/result.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+/// MemorySize is a representation of a number of bytes, viewable in different 
units.
+///
+/// <h2>Parsing</h2>
+///
+/// The size can be parsed from a text expression. If the expression is a pure 
number, the value
+/// will be interpreted as bytes.
+class PAIMON_EXPORT MemorySize {
+ public:
+    /// class which defines memory unit, mostly used to parse value from 
configuration file.
+    ///
+    /// To make larger values more compact, the common size suffixes are 
supported:
+    ///
+    /// <ul>
+    /// <li>1b or 1bytes (bytes)
+    /// <li>1k or 1kb or 1kibibytes (interpreted as kibibytes = 1024 bytes)
+    /// <li>1m or 1mb or 1mebibytes (interpreted as mebibytes = 1024 kibibytes)
+    /// <li>1g or 1gb or 1gibibytes (interpreted as gibibytes = 1024 mebibytes)
+    /// <li>1t or 1tb or 1tebibytes (interpreted as tebibytes = 1024 gibibytes)
+    /// </ul>
+    class MemoryUnit {
+     public:
+        MemoryUnit(const std::vector<std::string>& units, int64_t multiplier)
+            : units_(units), multiplier_(multiplier) {}
+
+        const std::vector<std::string>& GetUnits() const {
+            return units_;
+        }
+
+        int64_t GetMultiplier() const {
+            return multiplier_;
+        }
+
+     private:
+        std::vector<std::string> units_;
+        int64_t multiplier_;
+    };
+
+    static const MemoryUnit& Bytes();
+    static const MemoryUnit& KiloBytes();
+    static const MemoryUnit& MegaBytes();
+    static const MemoryUnit& GigaBytes();
+    static const MemoryUnit& TeraBytes();
+
+    static Result<int64_t> ParseBytes(const std::string& text);
+    static Result<MemoryUnit> ParseUnit(const std::string& unit);
+    static bool MatchesAny(const std::string& str, const 
MemorySize::MemoryUnit& unit);
+
+    MemorySize() = delete;
+    ~MemorySize() = delete;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/options/memory_size_test.cpp 
b/src/paimon/common/options/memory_size_test.cpp
new file mode 100644
index 0000000..97096b8
--- /dev/null
+++ b/src/paimon/common/options/memory_size_test.cpp
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/options/memory_size.h"
+
+#include <limits>
+
+#include "gtest/gtest.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(MemorySizeTest, TestParseBytes) {
+    ASSERT_OK_AND_ASSIGN(int64_t size, MemorySize::ParseBytes("1024"));
+    ASSERT_EQ(size, 1024L);
+
+    ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024b"));
+    ASSERT_EQ(size, 1024);
+
+    ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024bytes"));
+    ASSERT_EQ(size, 1024L);
+
+    ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024k"));
+    ASSERT_EQ(size, 1048576L);
+
+    ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024kb"));
+    ASSERT_EQ(size, 1048576L);
+
+    ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024kibibytes"));
+    ASSERT_EQ(size, 1048576L);
+
+    ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024m"));
+    ASSERT_EQ(size, 1073741824L);
+
+    ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024mb"));
+    ASSERT_EQ(size, 1073741824L);
+
+    ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024mebibytes"));
+    ASSERT_EQ(size, 1073741824L);
+
+    ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024g"));
+    ASSERT_EQ(size, 1099511627776L);
+
+    ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024gb"));
+    ASSERT_EQ(size, 1099511627776L);
+
+    ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024gibibytes"));
+    ASSERT_EQ(size, 1099511627776L);
+
+    ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024t"));
+    ASSERT_EQ(size, 1125899906842624L);
+
+    ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024tb"));
+    ASSERT_EQ(size, 1125899906842624L);
+
+    ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024tebibytes"));
+    ASSERT_EQ(size, 1125899906842624L);
+}
+
+TEST(MemorySizeTest, TestParseBytesUpperCaseAndWithSpace) {
+    ASSERT_OK_AND_ASSIGN(int64_t size, MemorySize::ParseBytes("1024 B"));
+    ASSERT_EQ(size, 1024);
+
+    ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024Bytes "));
+    ASSERT_EQ(size, 1024L);
+
+    ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024 K"));
+    ASSERT_EQ(size, 1048576L);
+
+    ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024Kb "));
+    ASSERT_EQ(size, 1048576L);
+
+    ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024kiBIbyTes"));
+    ASSERT_EQ(size, 1048576L);
+
+    ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024M"));
+    ASSERT_EQ(size, 1073741824L);
+
+    ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024mB"));
+    ASSERT_EQ(size, 1073741824L);
+
+    ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024 meBIbyteS"));
+    ASSERT_EQ(size, 1073741824L);
+
+    ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024G"));
+    ASSERT_EQ(size, 1099511627776L);
+
+    ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024 GB"));
+    ASSERT_EQ(size, 1099511627776L);
+
+    ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024gIbibytes"));
+    ASSERT_EQ(size, 1099511627776L);
+
+    ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024   T"));
+    ASSERT_EQ(size, 1125899906842624L);
+
+    ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes(" 1024Tb "));
+    ASSERT_EQ(size, 1125899906842624L);
+
+    ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024tebiBytes "));
+    ASSERT_EQ(size, 1125899906842624L);
+}
+
+TEST(MemorySizeTest, TestInvalidInput) {
+    ASSERT_NOK_WITH_MSG(MemorySize::ParseBytes(""),
+                        "argument is an empty or whitespace-only string");
+    ASSERT_NOK_WITH_MSG(MemorySize::ParseBytes("1 SomeUnknownUnit"),
+                        "does not match any of the recognized units");
+    ASSERT_NOK_WITH_MSG(MemorySize::ParseBytes("-1b"), "text does not start 
with a number");
+
+    uint64_t overflow_num = std::numeric_limits<uint64_t>::max();
+    ASSERT_NOK_WITH_MSG(MemorySize::ParseBytes(std::to_string(overflow_num) + 
" b"),
+                        "cannot be represented as 64bit number (numeric 
overflow)");
+    int64_t overflow_num_2 = std::numeric_limits<int64_t>::max();
+    ASSERT_NOK_WITH_MSG(MemorySize::ParseBytes(std::to_string(overflow_num_2) 
+ " kb"),
+                        "cannot be represented as 64bit number of bytes 
(numeric overflow)");
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/options/time_duration.cpp 
b/src/paimon/common/options/time_duration.cpp
new file mode 100644
index 0000000..f48b830
--- /dev/null
+++ b/src/paimon/common/options/time_duration.cpp
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/options/time_duration.h"
+
+#include <cstddef>
+#include <limits>
+#include <optional>
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+const TimeDuration::TimeUnit& TimeDuration::NanoSeconds() {
+    static const TimeUnit kNanoSeconds{{"ns", "nano", "nanos", "nanosecond", 
"nanoseconds"},
+                                       1000 * 1000L,
+                                       TimeUnit::CoefficientOperator::DIVIDE};
+    return kNanoSeconds;
+}
+
+const TimeDuration::TimeUnit& TimeDuration::MicroSeconds() {
+    static const TimeUnit kMicroSeconds{
+        {"us", "µs", "micro", "micros", "microsecond", "microseconds"},
+        1000L,
+        TimeUnit::CoefficientOperator::DIVIDE};
+    return kMicroSeconds;
+}
+
+const TimeDuration::TimeUnit& TimeDuration::MilliSeconds() {
+    static const TimeUnit kMilliSeconds{{"ms", "milli", "millis", 
"millisecond", "milliseconds"},
+                                        1,
+                                        
TimeUnit::CoefficientOperator::MULTIPLY};
+    return kMilliSeconds;
+}
+
+const TimeDuration::TimeUnit& TimeDuration::Seconds() {
+    static const TimeUnit kSeconds{
+        {"s", "sec", "secs", "second", "seconds"}, 1000L, 
TimeUnit::CoefficientOperator::MULTIPLY};
+    return kSeconds;
+}
+
+const TimeDuration::TimeUnit& TimeDuration::Minutes() {
+    static const TimeUnit kMinutes{
+        {"min", "m", "minute", "minutes"}, 1000L * 60, 
TimeUnit::CoefficientOperator::MULTIPLY};
+    return kMinutes;
+}
+
+const TimeDuration::TimeUnit& TimeDuration::Hours() {
+    static const TimeUnit kHours{
+        {"h", "hour", "hours"}, 1000L * 60 * 60, 
TimeUnit::CoefficientOperator::MULTIPLY};
+    return kHours;
+}
+
+const TimeDuration::TimeUnit& TimeDuration::Days() {
+    static const TimeUnit kDays{
+        {"d", "day", "days"}, 1000L * 60 * 60 * 24, 
TimeUnit::CoefficientOperator::MULTIPLY};
+    return kDays;
+}
+
+Result<int64_t> TimeDuration::Parse(const std::string& text) {
+    std::string trimmed = text;
+    StringUtils::Trim(&trimmed);
+    if (trimmed.empty()) {
+        return Status::Invalid("argument is an empty or whitespace-only 
string");
+    }
+    const size_t len = trimmed.length();
+    size_t pos = 0;
+
+    char current;
+    while (pos < len) {
+        current = trimmed[pos];
+        if (current >= '0' && current <= '9') {
+            pos++;
+        } else {
+            break;
+        }
+    }
+    std::string number = trimmed.substr(0, pos);
+    if (number.empty()) {
+        return Status::Invalid("text does not start with a number");
+    }
+    std::string raw_unit = trimmed.substr(pos);
+    StringUtils::Trim(&raw_unit);
+    std::string unit = StringUtils::ToLowerCase(raw_unit);
+
+    std::optional<int64_t> value = StringUtils::StringToValue<int64_t>(number);
+    if (value == std::nullopt) {
+        return Status::Invalid(fmt::format(
+            "The value '{}' cannot be represented as 64bit number (numeric 
overflow).", number));
+    }
+    PAIMON_ASSIGN_OR_RAISE(TimeUnit parsed_unit, ParseUnit(unit));
+    int64_t coefficient = parsed_unit.GetCoefficient();
+    TimeUnit::CoefficientOperator op = parsed_unit.GetCoefficientOperator();
+
+    int64_t maximum = std::numeric_limits<int64_t>::max() /
+                      (op == TimeUnit::CoefficientOperator::DIVIDE ? 1 : 
coefficient);
+    // check for overflow
+    if (value.value() > maximum) {
+        return Status::Invalid(
+            fmt::format("The value '{}' cannot be represented as 64bit number 
of milliseconds "
+                        "(numeric overflow).",
+                        text));
+    }
+    if (op == TimeUnit::CoefficientOperator::MULTIPLY) {
+        return value.value() * coefficient;
+    } else {
+        return value.value() / coefficient;
+    }
+}
+
+Result<TimeDuration::TimeUnit> TimeDuration::ParseUnit(const std::string& 
unit) {
+    if (MatchesAny(unit, NanoSeconds())) {
+        return NanoSeconds();
+    } else if (MatchesAny(unit, MicroSeconds())) {
+        return MicroSeconds();
+    } else if (MatchesAny(unit, MilliSeconds())) {
+        return MilliSeconds();
+    } else if (MatchesAny(unit, Seconds())) {
+        return Seconds();
+    } else if (MatchesAny(unit, Minutes())) {
+        return Minutes();
+    } else if (MatchesAny(unit, Hours())) {
+        return Hours();
+    } else if (MatchesAny(unit, Days())) {
+        return Days();
+    } else if (!unit.empty()) {
+        return Status::Invalid(fmt::format(
+            "Time duration unit '{}' does not match any of the recognized 
units", unit));
+    }
+    return MilliSeconds();
+}
+
+bool TimeDuration::MatchesAny(const std::string& str, const 
TimeDuration::TimeUnit& unit) {
+    for (const auto& u : unit.GetUnits()) {
+        if (u == str) {
+            return true;
+        }
+    }
+    return false;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/options/time_duration.h 
b/src/paimon/common/options/time_duration.h
new file mode 100644
index 0000000..e1f93e0
--- /dev/null
+++ b/src/paimon/common/options/time_duration.h
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include "paimon/result.h"
+
+namespace paimon {
+
+/// TimeDuration is a representation of a number of millisecond, viewable in 
different units.
+///
+/// <h2>Parsing</h2>
+///
+/// The size can be parsed from a text expression. If the expression is a pure 
number, the value
+/// will be interpreted as millisecond.
+class TimeDuration {
+ public:
+    /// class which defines time duration, mostly used to parse value from 
configuration file.
+    ///
+    /// To make larger values more compact, the common size suffixes are 
supported:
+    ///
+    /// <ul>
+    /// <li>1ns or 1nano or 1nanos or 1nanosecond or 1nanoseconds
+    /// <li>1us or 1µs or 1micro or 1micros or 1microsecond or 1microseconds
+    /// <li>1ms or 1milli or 1millis or 1millisecond or 1milliseconds
+    /// <li>1s or 1sec or 1secs or 1second or 1seconds
+    /// <li>1min or 1m or 1minute or 1minutes
+    /// <li>1h or 1hour or 1hours
+    /// <li>1d or 1day or 1days
+    /// </ul>
+    class TimeUnit {
+     public:
+        enum class CoefficientOperator { MULTIPLY, DIVIDE };
+
+        TimeUnit(const std::vector<std::string>& units, int64_t coefficient, 
CoefficientOperator op)
+            : units_(units), coefficient_(coefficient), op_(op) {}
+
+        const std::vector<std::string>& GetUnits() const {
+            return units_;
+        }
+
+        int64_t GetCoefficient() const {
+            return coefficient_;
+        }
+
+        CoefficientOperator GetCoefficientOperator() const {
+            return op_;
+        }
+
+     private:
+        std::vector<std::string> units_;
+        int64_t coefficient_;
+        CoefficientOperator op_;
+    };
+
+    static const TimeUnit& NanoSeconds();
+    static const TimeUnit& MicroSeconds();
+    static const TimeUnit& MilliSeconds();
+    static const TimeUnit& Seconds();
+    static const TimeUnit& Minutes();
+    static const TimeUnit& Hours();
+    static const TimeUnit& Days();
+
+    static Result<int64_t> Parse(const std::string& text);
+    static Result<TimeUnit> ParseUnit(const std::string& unit);
+    static bool MatchesAny(const std::string& str, const 
TimeDuration::TimeUnit& unit);
+
+    TimeDuration() = delete;
+    ~TimeDuration() = delete;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/options/time_duration_test.cpp 
b/src/paimon/common/options/time_duration_test.cpp
new file mode 100644
index 0000000..1804481
--- /dev/null
+++ b/src/paimon/common/options/time_duration_test.cpp
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/options/time_duration.h"
+
+#include "gtest/gtest.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(TimeDurationTest, TestParseTime) {
+    ASSERT_OK_AND_ASSIGN(int64_t time, TimeDuration::Parse("1000 ns"));
+    ASSERT_EQ(time, 0L);
+    for (const auto& unit : {"ns", " ns", "nano", " nano", "nanos", 
"nanosecond", "nanoseconds"}) {
+        ASSERT_OK_AND_ASSIGN(time, 
TimeDuration::Parse(std::string("123456789") + unit));
+        ASSERT_EQ(time, 123L);
+    }
+    for (const auto& unit : {"us", " us", "µs", "micro", "micros", 
"microsecond", "microseconds"}) {
+        ASSERT_OK_AND_ASSIGN(time, 
TimeDuration::Parse(std::string("123456789") + unit));
+        ASSERT_EQ(time, 123456L);
+    }
+    for (const auto& unit : {"ms", "milli", " milli", "millis", "millisecond", 
"milliseconds"}) {
+        ASSERT_OK_AND_ASSIGN(time, TimeDuration::Parse(std::string("1000") + 
unit));
+        ASSERT_EQ(time, 1000L);
+    }
+    // without time unit, default time unit is milli
+    ASSERT_OK_AND_ASSIGN(time, TimeDuration::Parse("1000"));
+    ASSERT_EQ(time, 1000L);
+    for (const auto& unit : {"s", "sec", "secs", " second", "seconds"}) {
+        ASSERT_OK_AND_ASSIGN(time, TimeDuration::Parse(std::string("1000") + 
unit));
+        ASSERT_EQ(time, 1000000L);
+    }
+    for (const auto& unit : {" min", "m", "minute", "minutes"}) {
+        ASSERT_OK_AND_ASSIGN(time, TimeDuration::Parse(std::string("1000") + 
unit));
+        ASSERT_EQ(time, 60000000L);
+    }
+    for (const auto& unit : {"h", "hour", " hours"}) {
+        ASSERT_OK_AND_ASSIGN(time, TimeDuration::Parse(std::string("1000") + 
unit));
+        ASSERT_EQ(time, 3600000000L);
+    }
+    for (const auto& unit : {"d", "day", " days"}) {
+        ASSERT_OK_AND_ASSIGN(time, TimeDuration::Parse(std::string("1000") + 
unit));
+        ASSERT_EQ(time, 86400000000L);
+    }
+    // with invalid time unit
+    ASSERT_NOK_WITH_MSG(TimeDuration::Parse("1000ss"),
+                        "Time duration unit 'ss' does not match any of the 
recognized units");
+    // with invalid time duration
+    ASSERT_NOK_WITH_MSG(TimeDuration::Parse(""), "argument is an empty or 
whitespace-only string");
+    ASSERT_NOK_WITH_MSG(TimeDuration::Parse("       "),
+                        "argument is an empty or whitespace-only string");
+    ASSERT_NOK_WITH_MSG(TimeDuration::Parse("ns"), "text does not start with a 
number");
+}
+
+TEST(TimeDurationTest, TestBoundaryCheck) {
+    ASSERT_OK_AND_ASSIGN(int64_t time, TimeDuration::Parse("1000 us"));
+    ASSERT_EQ(time, 1L);
+    ASSERT_OK(TimeDuration::Parse("106751991167d"));
+    ASSERT_NOK(TimeDuration::Parse("106751991168d"));
+    ASSERT_OK(TimeDuration::Parse("2562047788015h"));
+    ASSERT_NOK(TimeDuration::Parse("2562047788016h"));
+    ASSERT_OK(TimeDuration::Parse("153722867280912m"));
+    ASSERT_NOK(TimeDuration::Parse("153722867280913m"));
+    ASSERT_OK(TimeDuration::Parse("9223372036854775s"));
+    ASSERT_NOK(TimeDuration::Parse("9223372036854776s"));
+    ASSERT_OK(TimeDuration::Parse("9223372036854775807ms"));
+    ASSERT_NOK(TimeDuration::Parse("9223372036854775808ms"));
+}
+}  // namespace paimon::test

Reply via email to