This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new f9008f2 feat: add GenericRow, RecordBatch, MemorySize and
TimeDuration (#28)
f9008f2 is described below
commit f9008f2111359df84f82df33f54dc888ab0e4f31
Author: lszskye <[email protected]>
AuthorDate: Fri May 29 11:13:00 2026 +0800
feat: add GenericRow, RecordBatch, MemorySize and TimeDuration (#28)
Squash merge PR #28.
---
include/paimon/record_batch.h | 159 +++++++++++++++
src/paimon/common/data/generic_row.h | 237 +++++++++++++++++++++++
src/paimon/common/data/generic_row_test.cpp | 110 +++++++++++
src/paimon/common/data/record_batch.cpp | 163 ++++++++++++++++
src/paimon/common/data/record_batch_test.cpp | 122 ++++++++++++
src/paimon/common/options/memory_size.cpp | 130 +++++++++++++
src/paimon/common/options/memory_size.h | 82 ++++++++
src/paimon/common/options/memory_size_test.cpp | 136 +++++++++++++
src/paimon/common/options/time_duration.cpp | 162 ++++++++++++++++
src/paimon/common/options/time_duration.h | 92 +++++++++
src/paimon/common/options/time_duration_test.cpp | 87 +++++++++
11 files changed, 1480 insertions(+)
diff --git a/include/paimon/record_batch.h b/include/paimon/record_batch.h
new file mode 100644
index 0000000..9ea8a4b
--- /dev/null
+++ b/include/paimon/record_batch.h
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "paimon/result.h"
+#include "paimon/visibility.h"
+
+struct ArrowArray;
+
+namespace paimon {
+/// `RecordBatch` encapsulates a batch of data with the same schema,
supporting different types such
+/// as `INSERT`, `UPDATE_BEFORE`, `UPDATE_AFTER`, and `DELETE`. It is
typically used in streaming
+/// write or batch processing scenarios, with underlying data stored in the
Apache Arrow format.
+/// @note Do not use this class directly, use `RecordBatchBuilder` to build a
`RecordBatch` which
+/// has input validation.
+class PAIMON_EXPORT RecordBatch {
+ public:
+ enum class PAIMON_EXPORT RowKind : int8_t {
+ INSERT = 0,
+ UPDATE_BEFORE = 1,
+ UPDATE_AFTER = 2,
+ DELETE = 3,
+ };
+
+ /// @note 1. Data cannot be reused, as it will be released after Write. 2.
If a partition
+ /// field's value is null, it should be represented as
"__DEFAULT_PARTITION__"(or a user-defined
+ /// default value) in the partition map. However, in the Arrow array,
partition column values
+ /// MUST NOT be set to "__DEFAULT_PARTITION__" (or a user-defined default
value). Instead, they
+ /// should be properly set as actual nulls. If used, it may lead to
behavioral inconsistencies
+ /// between C++ Paimon and Java Paimon.
+ RecordBatch(const std::map<std::string, std::string>& partition, int32_t
bucket,
+ const std::vector<RowKind>& row_kinds, ArrowArray* data);
+ ~RecordBatch();
+
+ RecordBatch(RecordBatch&&);
+ RecordBatch& operator=(RecordBatch&&);
+
+ RecordBatch(const RecordBatch&) = delete;
+ RecordBatch& operator=(const RecordBatch&) = delete;
+
+ const std::map<std::string, std::string>& GetPartition() const {
+ return partition_;
+ }
+
+ int32_t GetBucket() const {
+ return bucket_;
+ }
+
+ ArrowArray* GetData() const {
+ return data_;
+ }
+
+ const std::vector<RecordBatch::RowKind>& GetRowKind() const {
+ return row_kinds_;
+ }
+
+ void SetBucket(int32_t bucket) {
+ bucket_ = bucket;
+ }
+
+ bool HasSpecifiedBucket() const;
+
+ private:
+ std::map<std::string, std::string> partition_;
+ int32_t bucket_;
+ std::vector<RecordBatch::RowKind> row_kinds_;
+ ::ArrowArray* data_;
+};
+
+/// Builder for constructing `RecordBatch` instances.
+///
+/// This class provides a convenient way to build `RecordBatch` objects by
setting
+/// various properties such as data, row kinds, partition information, and
bucket id.
+class PAIMON_EXPORT RecordBatchBuilder {
+ public:
+ /// Constructs a `RecordBatchBuilder` with Arrow data
+ ///
+ /// @note The `data` must conform to table schema:
+ /// - Each array in `data` corresponds to a field in table schema.
+ /// - If a field in table schema is marked as non-nullable
(`nullable = false`),
+ /// the corresponding array in `data` must have zero null entries.
+ ///
+ /// @note Consistency between `data` and table schema will be validated
during the write
+ /// process.
+ ///
+ /// @param data ArrowArray struct containing the columnar data (via C Data
Interface)
+ explicit RecordBatchBuilder(::ArrowArray* data);
+
+ ~RecordBatchBuilder();
+
+ /// Move new Arrow data into the builder, replacing existing data.
+ /// @param data New Arrow array data.
+ RecordBatchBuilder& MoveData(::ArrowArray* data);
+
+ /// Set the row kinds for each record in the batch.
+ /// @param row_kinds A vector of row kinds, including INSERT,
UPDATE_BEFORE, UPDATE_AFTER and
+ /// DELETE. If not set, default value is `INSERT`.
+ /// @note `row_kinds` must have the same length as the number of records
in the data.
+ RecordBatchBuilder& SetRowKinds(const std::vector<RecordBatch::RowKind>&
row_kinds);
+
+ /// Set the partition information for this record batch.
+ /// @param data Map of partition column names to their string values.
+ RecordBatchBuilder& SetPartition(const std::map<std::string, std::string>&
data);
+
+ /// Set the bucket id for this record batch. If not set, default value is
`-2147483648`
+ /// (i.e., `HasSpecifiedBucket()` returns false), and the bucket will be
auto-resolved
+ /// at write time based on the table's bucket option:
+ ///
+ /// - **Unaware-bucket mode** (table option `bucket = -1`, append-only
table without
+ /// primary keys): the bucket will be auto-filled with `UNAWARE_BUCKET
(0)`. If the
+ /// caller does specify a bucket, it MUST be `UNAWARE_BUCKET (0)`,
otherwise the
+ /// write fails.
+ /// - **Postpone-bucket mode** (table option `bucket = -2`, primary-key
table whose
+ /// bucket assignment is deferred): the bucket will be auto-filled with
+ /// `POSTPONE_BUCKET (-2)`. If the caller does specify a bucket, it MUST
be
+ /// `POSTPONE_BUCKET (-2)`, otherwise the write fails.
+ /// - **Fixed-bucket mode** (table option `bucket > 0`): the caller MUST
explicitly
+ /// call `SetBucket()` with a value in `[0, bucket)`; not calling
`SetBucket()`
+ /// (or passing an out-of-range value) will cause the write to fail.
+ ///
+ /// @param bucket The bucket id for data distribution.
+ RecordBatchBuilder& SetBucket(int32_t bucket);
+
+ /// Build and return the final `RecordBatch` instance.
+ ///
+ /// This method validates the configuration and creates `RecordBatch` with
all
+ /// the specified properties.
+ Result<std::unique_ptr<RecordBatch>> Finish();
+
+ class Impl;
+
+ private:
+ std::unique_ptr<Impl> impl_;
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/data/generic_row.h
b/src/paimon/common/data/generic_row.h
new file mode 100644
index 0000000..7e5da05
--- /dev/null
+++ b/src/paimon/common/data/generic_row.h
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <algorithm>
+#include <cassert>
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <string_view>
+#include <utility>
+#include <variant>
+#include <vector>
+
+#include "paimon/common/data/binary_string.h"
+#include "paimon/common/data/data_define.h"
+#include "paimon/common/data/internal_row.h"
+#include "paimon/common/types/row_kind.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/result.h"
+
+namespace paimon {
+class Bytes;
+class InternalArray;
+class InternalMap;
+/// An internal data structure representing data of row.
+
+/// GenericRow is a generic implementation of `InternalRow` which is backed by
an array of
+/// VariantType. A GenericRow can have an arbitrary number of fields of
different types. The fields
+/// in a row can be accessed by position (0-based) using either the generic
`GetField()` or
+/// type-specific getters (such as `GetInt()`). A field can be updated by the
generic `SetField()`.
+
+/// @note All fields of this data structure must be internal data structures.
See `InternalRow`
+/// for more information about internal data structures. The fields in
GenericRow can be null for
+/// representing nullability. Noted that GenericRow does not support nested
type (i.e., Map, Array,
+/// Struct).
+class GenericRow : public InternalRow {
+ public:
+ /// Creates an instance of GenericRow with given number of fields.
+ /// Initially, all fields are set to null. By default, the row describes a
+ /// `RowKind::Insert()` in a changelog.
+ /// @note All fields of the row must be internal data structures.
+ /// @param arity number of fields
+ explicit GenericRow(int32_t arity) : GenericRow(RowKind::Insert(), arity)
{}
+
+ /// Creates an instance of GenericRow with given kind and number of fields.
+ /// Initially, all fields are set to null.
+ /// @note All fields of the row must be internal data structures.
+ /// @param kind kind of change that this row describes in a changelog
+ /// @param arity number of fields
+ GenericRow(const RowKind* kind, int32_t arity) : kind_(kind) {
+ fields_.resize(arity);
+ }
+
+ /// Sets the field value at the given position.
+ /// @note The given field value must be an internal data structures.
Otherwise the
+ /// GenericRow is corrupted. See `InternalRow` for more information about
internal data
+ /// structures. The field value can be null for representing nullability.
+ void SetField(int32_t pos, const VariantType& value) {
+ assert(static_cast<size_t>(pos) < fields_.size());
+ fields_[pos] = value;
+ }
+
+ /// @return the field value at the given position.
+ /// @note The returned value is in internal data structure. See
`InternalRow`
+ /// for more information about internal data structures.
+ /// The returned field value can be null for representing nullability.
+ const VariantType& GetField(int32_t pos) const {
+ assert(static_cast<size_t>(pos) < fields_.size());
+ return fields_[pos];
+ }
+
+ int32_t GetFieldCount() const override {
+ return fields_.size();
+ }
+
+ Result<const RowKind*> GetRowKind() const override {
+ return kind_;
+ }
+
+ void SetRowKind(const RowKind* kind) override {
+ kind_ = kind;
+ }
+
+ bool IsNullAt(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < fields_.size());
+ return DataDefine::IsVariantNull(fields_[pos]);
+ }
+
+ void AddDataHolder(std::unique_ptr<InternalRow>&& holder) {
+ row_holder_.push_back(std::move(holder));
+ }
+
+ void AddDataHolder(const std::shared_ptr<Bytes>& bytes) {
+ bytes_holder_ = bytes;
+ }
+
+ bool GetBoolean(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < fields_.size());
+ return DataDefine::GetVariantValue<bool>(fields_[pos]);
+ }
+
+ char GetByte(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < fields_.size());
+ return DataDefine::GetVariantValue<char>(fields_[pos]);
+ }
+
+ int16_t GetShort(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < fields_.size());
+ return DataDefine::GetVariantValue<int16_t>(fields_[pos]);
+ }
+
+ int32_t GetInt(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < fields_.size());
+ return DataDefine::GetVariantValue<int32_t>(fields_[pos]);
+ }
+
+ int32_t GetDate(int32_t pos) const override {
+ return GetInt(pos);
+ }
+
+ int64_t GetLong(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < fields_.size());
+ return DataDefine::GetVariantValue<int64_t>(fields_[pos]);
+ }
+
+ float GetFloat(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < fields_.size());
+ return DataDefine::GetVariantValue<float>(fields_[pos]);
+ }
+
+ double GetDouble(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < fields_.size());
+ return DataDefine::GetVariantValue<double>(fields_[pos]);
+ }
+
+ BinaryString GetString(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < fields_.size());
+ return DataDefine::GetVariantValue<BinaryString>(fields_[pos]);
+ }
+
+ std::string_view GetStringView(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < fields_.size());
+ return DataDefine::GetStringView(fields_[pos]);
+ }
+
+ Decimal GetDecimal(int32_t pos, int32_t precision, int32_t scale) const
override {
+ assert(static_cast<size_t>(pos) < fields_.size());
+ return DataDefine::GetVariantValue<Decimal>(fields_[pos]);
+ }
+
+ Timestamp GetTimestamp(int32_t pos, int32_t precision) const override {
+ assert(static_cast<size_t>(pos) < fields_.size());
+ return DataDefine::GetVariantValue<Timestamp>(fields_[pos]);
+ }
+
+ std::shared_ptr<Bytes> GetBinary(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < fields_.size());
+ return
DataDefine::GetVariantValue<std::shared_ptr<Bytes>>(fields_[pos]);
+ }
+
+ std::shared_ptr<InternalArray> GetArray(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < fields_.size());
+ return
DataDefine::GetVariantValue<std::shared_ptr<InternalArray>>(fields_[pos]);
+ }
+
+ std::shared_ptr<InternalMap> GetMap(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < fields_.size());
+ return
DataDefine::GetVariantValue<std::shared_ptr<InternalMap>>(fields_[pos]);
+ }
+
+ std::shared_ptr<InternalRow> GetRow(int32_t pos, int32_t num_fields) const
override {
+ assert(static_cast<size_t>(pos) < fields_.size());
+ return
DataDefine::GetVariantValue<std::shared_ptr<InternalRow>>(fields_[pos]);
+ }
+
+ std::string ToString() const override {
+ std::string ret = "(";
+ for (size_t i = 0; i < fields_.size(); i++) {
+ if (i != 0) {
+ ret.append(",");
+ }
+ ret.append(DataDefine::VariantValueToString(fields_[i]));
+ }
+ ret.append(")");
+ return ret;
+ }
+
+ bool operator==(const GenericRow& other) const {
+ if (this == &other) {
+ return true;
+ }
+ bool kind_equal =
+ (kind_ == other.kind_) || (kind_ && other.kind_ && *kind_ ==
*other.kind_);
+ return kind_equal && fields_ == other.fields_;
+ }
+
+ /// Creates an instance of GenericRow with given field values.
+ static std::unique_ptr<GenericRow> Of(const std::vector<VariantType>&
values) {
+ auto row = std::make_unique<GenericRow>(values.size());
+ for (size_t i = 0; i < values.size(); i++) {
+ row->SetField(i, values[i]);
+ }
+ return row;
+ }
+
+ private:
+ /// The array to store the actual internal format values.
+ std::vector<VariantType> fields_;
+ /// As GenericRow only holds string view for string data to avoid deep
copy, original data must
+ /// be held in row holders_ or bytes holder
+ std::vector<std::unique_ptr<InternalRow>> row_holder_;
+ std::shared_ptr<Bytes> bytes_holder_;
+ /// The kind of change that a row describes in a changelog.
+ const RowKind* kind_;
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/data/generic_row_test.cpp
b/src/paimon/common/data/generic_row_test.cpp
new file mode 100644
index 0000000..3d6abde
--- /dev/null
+++ b/src/paimon/common/data/generic_row_test.cpp
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/data/generic_row.h"
+
+#include "arrow/api.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/binary_array.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/data/columnar/columnar_map.h"
+#include "paimon/common/data/internal_array.h"
+#include "paimon/common/data/internal_map.h"
+#include "paimon/common/utils/decimal_utils.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/binary_row_generator.h"
+
+namespace paimon::test {
+
+TEST(GenericRowTest, TestSimple) {
+ auto pool = GetDefaultPool();
+ GenericRow row(16);
+ row.SetField(0, true);
+ row.SetField(1, static_cast<char>(1));
+ row.SetField(2, static_cast<int16_t>(2));
+ row.SetField(3, static_cast<int32_t>(3));
+ row.SetField(4, static_cast<int64_t>(4));
+ row.SetField(5, static_cast<float>(5.1));
+ row.SetField(6, 6.12);
+ auto str = BinaryString::FromString("abcd", pool.get());
+ row.SetField(7, str);
+ std::shared_ptr<Bytes> bytes = Bytes::AllocateBytes("efgh", pool.get());
+ row.SetField(8, bytes);
+ std::string str9 = "apple";
+ row.SetField(9, std::string_view(str9.data(), str9.size()));
+
+ Timestamp ts(100, 20);
+ row.SetField(10, ts);
+ Decimal decimal(/*precision=*/30, /*scale=*/20,
+
DecimalUtils::StrToInt128("12345678998765432145678").value());
+ row.SetField(11, decimal);
+
+ auto array = std::make_shared<BinaryArray>(BinaryArray::FromLongArray(
+ {static_cast<int64_t>(10), static_cast<int64_t>(20)}, pool.get()));
+ row.SetField(12, array);
+
+ std::shared_ptr<InternalRow> binary_row =
+ BinaryRowGenerator::GenerateRowPtr({100, 200}, pool.get());
+ row.SetField(13, binary_row);
+
+ auto key = arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), "[1,
2, 3]").ValueOrDie();
+ auto value =
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::int64(), "[2, 4,
6]").ValueOrDie();
+ auto map = std::make_shared<ColumnarMap>(key, value, pool, /*offset=*/0,
/*length=*/3);
+ row.SetField(14, map);
+ // do not set value at pos 15, therefore, pos 15 is null
+
+ ASSERT_EQ(row.GetFieldCount(), 16);
+ ASSERT_EQ(row.GetRowKind().value(), RowKind::Insert());
+ row.SetRowKind(RowKind::Delete());
+ ASSERT_EQ(row.GetRowKind().value(), RowKind::Delete());
+
+ ASSERT_TRUE(row == row);
+ // test get
+ ASSERT_FALSE(row.IsNullAt(0));
+ ASSERT_EQ(row.GetBoolean(0), true);
+ ASSERT_EQ(row.GetByte(1), static_cast<char>(1));
+ ASSERT_EQ(row.GetShort(2), static_cast<int16_t>(2));
+ ASSERT_EQ(row.GetInt(3), static_cast<int32_t>(3));
+ ASSERT_EQ(row.GetDate(3), static_cast<int32_t>(3));
+ ASSERT_EQ(row.GetLong(4), static_cast<int64_t>(4));
+ ASSERT_EQ(row.GetFloat(5), static_cast<float>(5.1));
+ ASSERT_EQ(row.GetDouble(6), static_cast<double>(6.12));
+ ASSERT_EQ(row.GetString(7), str);
+ ASSERT_EQ(*row.GetBinary(8), *bytes);
+ ASSERT_EQ(std::string(row.GetStringView(9)), str9);
+ ASSERT_EQ(row.GetTimestamp(10, /*precision=*/9), ts);
+ ASSERT_EQ(row.GetDecimal(11, /*precision=*/30, /*scale=*/20), decimal);
+ ASSERT_EQ(row.GetArray(12)->ToLongArray().value(),
array->ToLongArray().value());
+ auto binary_row_result =
std::dynamic_pointer_cast<BinaryRow>(row.GetRow(13, 2));
+ auto binary_row_expected =
std::dynamic_pointer_cast<BinaryRow>(binary_row);
+ ASSERT_EQ(*binary_row_result, *binary_row_expected);
+ ASSERT_EQ(row.GetMap(14)->KeyArray()->ToIntArray().value(),
+ map->KeyArray()->ToIntArray().value());
+ ASSERT_EQ(row.GetMap(14)->ValueArray()->ToLongArray().value(),
+ map->ValueArray()->ToLongArray().value());
+ ASSERT_TRUE(row.IsNullAt(15));
+ ASSERT_EQ(
+ "(true,1,2,3,4,5.100000,6.120000,abcd,efgh,apple,1970-01-01 "
+ "00:00:00.100000020,123.45678998765432145678,array,row,map,null)",
+ row.ToString());
+}
+} // namespace paimon::test
diff --git a/src/paimon/common/data/record_batch.cpp
b/src/paimon/common/data/record_batch.cpp
new file mode 100644
index 0000000..1f1df68
--- /dev/null
+++ b/src/paimon/common/data/record_batch.cpp
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/record_batch.h"
+
+#include <cassert>
+#include <cstddef>
+#include <limits>
+#include <utility>
+
+#include "arrow/c/abi.h"
+#include "arrow/c/helpers.h"
+#include "fmt/format.h"
+#include "paimon/common/utils/scope_guard.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+class RecordBatchBuilder::Impl {
+ public:
+ friend class RecordBatchBuilder;
+ static constexpr int32_t INVALID_BUCKET =
std::numeric_limits<int32_t>::min();
+
+ void MoveData(::ArrowArray* data) {
+ assert(data);
+ ReleaseData();
+ data_ = new ::ArrowArray();
+ ArrowArrayMove(data, data_);
+ }
+
+ void Reset() {
+ partition_.clear();
+ bucket_ = INVALID_BUCKET;
+ row_kinds_.clear();
+ ReleaseData();
+ }
+
+ private:
+ void ReleaseData() {
+ if (data_) {
+ ArrowArrayRelease(data_);
+ delete data_;
+ data_ = nullptr;
+ }
+ }
+
+ std::map<std::string, std::string> partition_;
+ int32_t bucket_ = INVALID_BUCKET;
+ std::vector<RecordBatch::RowKind> row_kinds_;
+ ::ArrowArray* data_ = nullptr;
+};
+
+RecordBatchBuilder::RecordBatchBuilder(::ArrowArray* data) :
impl_(std::make_unique<Impl>()) {
+ impl_->MoveData(data);
+}
+
+RecordBatchBuilder::~RecordBatchBuilder() {
+ if (impl_) {
+ impl_->Reset();
+ }
+}
+
+RecordBatchBuilder& RecordBatchBuilder::MoveData(::ArrowArray* data) {
+ impl_->MoveData(data);
+ return *this;
+}
+
+RecordBatchBuilder& RecordBatchBuilder::SetRowKinds(
+ const std::vector<RecordBatch::RowKind>& row_kinds) {
+ impl_->row_kinds_ = row_kinds;
+ return *this;
+}
+
+RecordBatchBuilder& RecordBatchBuilder::SetPartition(
+ const std::map<std::string, std::string>& partition) {
+ impl_->partition_ = partition;
+ return *this;
+}
+
+RecordBatchBuilder& RecordBatchBuilder::SetBucket(int32_t bucket) {
+ impl_->bucket_ = bucket;
+ return *this;
+}
+
+Result<std::unique_ptr<RecordBatch>> RecordBatchBuilder::Finish() {
+ ScopeGuard guard([this]() { impl_->Reset(); });
+ if (impl_->data_ == nullptr) {
+ return Status::Invalid("data is null pointer");
+ }
+ if (ArrowArrayIsReleased(impl_->data_)) {
+ return Status::Invalid("data is released");
+ }
+ if (!impl_->row_kinds_.empty() &&
+ impl_->row_kinds_.size() != static_cast<size_t>(impl_->data_->length))
{
+ return Status::Invalid(fmt::format("data size {} does not match with
row_kinds size {}",
+ impl_->data_->length,
impl_->row_kinds_.size()));
+ }
+ return std::make_unique<RecordBatch>(impl_->partition_, impl_->bucket_,
impl_->row_kinds_,
+ impl_->data_);
+}
+
+RecordBatch::RecordBatch(const std::map<std::string, std::string>& partition,
int32_t bucket,
+ const std::vector<RowKind>& row_kinds, ArrowArray*
data)
+ : partition_(partition), bucket_(bucket), row_kinds_(row_kinds) {
+ data_ = new ArrowArray();
+ ArrowArrayMove(data, data_);
+}
+
+RecordBatch::~RecordBatch() {
+ if (data_) {
+ ArrowArrayRelease(data_);
+ delete data_;
+ }
+}
+
+RecordBatch::RecordBatch(RecordBatch&& other) {
+ if (this != &other) {
+ partition_ = std::move(other.partition_);
+ bucket_ = other.bucket_;
+ row_kinds_ = std::move(other.row_kinds_);
+ data_ = other.data_;
+ other.data_ = nullptr;
+ }
+}
+
+RecordBatch& RecordBatch::operator=(RecordBatch&& other) {
+ if (this == &other) {
+ return *this;
+ }
+ partition_ = std::move(other.partition_);
+ bucket_ = other.bucket_;
+ row_kinds_ = std::move(other.row_kinds_);
+ if (data_) {
+ ArrowArrayRelease(data_);
+ delete data_;
+ }
+ data_ = other.data_;
+ other.data_ = nullptr;
+ return *this;
+}
+
+bool RecordBatch::HasSpecifiedBucket() const {
+ return bucket_ != RecordBatchBuilder::Impl::INVALID_BUCKET;
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/data/record_batch_test.cpp
b/src/paimon/common/data/record_batch_test.cpp
new file mode 100644
index 0000000..4b50322
--- /dev/null
+++ b/src/paimon/common/data/record_batch_test.cpp
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/record_batch.h"
+
+#include <utility>
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_nested.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/ipc/json_simple.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "gtest/gtest.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(RecordBatchTest, TestSimple) {
+ // prepare an arrow array with
struct<col1:string,col2:int32,col3:int64,col4:bool>
+ auto string_field = arrow::field("col1", arrow::utf8());
+ auto int_field = arrow::field("col2", arrow::int32());
+ auto long_field = arrow::field("col3", arrow::int64());
+ auto bool_field = arrow::field("col4", arrow::boolean());
+
+ auto struct_type = arrow::struct_({string_field, int_field, long_field,
bool_field});
+ auto schema =
+ arrow::schema(arrow::FieldVector({string_field, int_field, long_field,
bool_field}));
+
+ arrow::StructBuilder struct_builder(
+ struct_type, arrow::default_memory_pool(),
+ {std::make_shared<arrow::StringBuilder>(),
std::make_shared<arrow::Int32Builder>(),
+ std::make_shared<arrow::Int64Builder>(),
std::make_shared<arrow::BooleanBuilder>()});
+ auto string_builder =
static_cast<arrow::StringBuilder*>(struct_builder.field_builder(0));
+ auto int_builder =
static_cast<arrow::Int32Builder*>(struct_builder.field_builder(1));
+ auto long_builder =
static_cast<arrow::Int64Builder*>(struct_builder.field_builder(2));
+ auto bool_builder =
static_cast<arrow::BooleanBuilder*>(struct_builder.field_builder(3));
+ for (int32_t i = 0; i < 10; ++i) {
+ ASSERT_TRUE(struct_builder.Append().ok());
+ ASSERT_TRUE(string_builder->Append("20240813").ok());
+ ASSERT_TRUE(int_builder->Append(23).ok());
+ ASSERT_TRUE(long_builder->Append(static_cast<int64_t>(1722848484308ll
+ i)).ok());
+ ASSERT_TRUE(bool_builder->Append(static_cast<bool>(i % 2)).ok());
+ }
+ std::shared_ptr<arrow::Array> array;
+ ASSERT_TRUE(struct_builder.Finish(&array).ok());
+ ::ArrowArray arrow_array;
+ ASSERT_TRUE(arrow::ExportArray(*array, &arrow_array).ok());
+ RecordBatchBuilder batch_builder(&arrow_array);
+ std::map<std::string, std::string> partition = {{"col1", "20240813"},
{"col2", "23"}};
+ ASSERT_NOK(batch_builder.SetPartition(partition)
+ .SetRowKinds({RecordBatch::RowKind::INSERT,
RecordBatch::RowKind::INSERT})
+ .Finish());
+ ::ArrowArray arrow_array2;
+ ASSERT_TRUE(arrow::ExportArray(*array, &arrow_array2).ok());
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch2,
+
batch_builder.MoveData(&arrow_array2).SetPartition(partition).Finish());
+
+ RecordBatch batch3 = std::move(*batch2);
+ ASSERT_EQ(batch3.GetPartition(), partition);
+
+ RecordBatch batch4(std::move(batch3));
+ ASSERT_EQ(batch4.GetPartition(), partition);
+}
+
+TEST(RecordBatchTest, TestAssignAndMove) {
+ arrow::FieldVector fields = {arrow::field("f0", arrow::boolean()),
+ arrow::field("f1", arrow::int8())};
+ std::map<std::string, std::string> partition = {{"f1", "1"}};
+ auto old_array = std::dynamic_pointer_cast<arrow::StructArray>(
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([
+ [true, 1]
+ ])")
+ .ValueOrDie());
+ ::ArrowArray old_arrow_array;
+ ASSERT_TRUE(arrow::ExportArray(*old_array, &old_arrow_array).ok());
+ RecordBatch old_batch(partition, /*bucket=*/0,
{RecordBatch::RowKind::INSERT},
+ &old_arrow_array);
+
+ auto new_array = std::dynamic_pointer_cast<arrow::StructArray>(
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([
+ [false, 1]
+ ])")
+ .ValueOrDie());
+ ::ArrowArray new_arrow_array;
+ ASSERT_TRUE(arrow::ExportArray(*new_array, &new_arrow_array).ok());
+ RecordBatch new_batch(partition, /*bucket=*/1,
{RecordBatch::RowKind::INSERT},
+ &new_arrow_array);
+
+ old_batch = std::move(new_batch);
+ ASSERT_EQ(old_batch.GetBucket(), 1);
+ ASSERT_FALSE(
+ new_batch.GetData()); // NOLINT(bugprone-use-after-move,
clang-analyzer-cplusplus.Move)
+ new_batch = std::move(old_batch);
+ ASSERT_EQ(new_batch.GetBucket(), 1);
+ ASSERT_FALSE(
+ old_batch.GetData()); // NOLINT(bugprone-use-after-move,
clang-analyzer-cplusplus.Move)
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/options/memory_size.cpp
b/src/paimon/common/options/memory_size.cpp
new file mode 100644
index 0000000..b89d170
--- /dev/null
+++ b/src/paimon/common/options/memory_size.cpp
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/options/memory_size.h"
+
+#include <cstddef>
+#include <limits>
+#include <optional>
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+const MemorySize::MemoryUnit& MemorySize::Bytes() {
+ static const MemoryUnit kBytes{{"b", "bytes"}, 1L};
+ return kBytes;
+}
+
+const MemorySize::MemoryUnit& MemorySize::KiloBytes() {
+ static const MemoryUnit kKiloBytes{{"k", "kb", "kibibytes"}, 1024L};
+ return kKiloBytes;
+}
+
+const MemorySize::MemoryUnit& MemorySize::MegaBytes() {
+ static const MemoryUnit kMegaBytes{{"m", "mb", "mebibytes"}, 1024L *
1024L};
+ return kMegaBytes;
+}
+
+const MemorySize::MemoryUnit& MemorySize::GigaBytes() {
+ static const MemoryUnit kGigaBytes{{"g", "gb", "gibibytes"}, 1024L * 1024L
* 1024L};
+ return kGigaBytes;
+}
+
+const MemorySize::MemoryUnit& MemorySize::TeraBytes() {
+ static const MemoryUnit kTeraBytes{{"t", "tb", "tebibytes"}, 1024L * 1024L
* 1024L * 1024L};
+ return kTeraBytes;
+}
+
+Result<int64_t> MemorySize::ParseBytes(const std::string& text) {
+ std::string trimmed = text;
+ StringUtils::Trim(&trimmed);
+ if (trimmed.empty()) {
+ return Status::Invalid("argument is an empty or whitespace-only
string");
+ }
+ const size_t len = trimmed.length();
+ size_t pos = 0;
+
+ char current;
+ while (pos < len) {
+ current = trimmed[pos];
+ if (current >= '0' && current <= '9') {
+ pos++;
+ } else {
+ break;
+ }
+ }
+ std::string number = trimmed.substr(0, pos);
+ if (number.empty()) {
+ return Status::Invalid("text does not start with a number");
+ }
+ std::string raw_unit = trimmed.substr(pos);
+ StringUtils::Trim(&raw_unit);
+ std::string unit = StringUtils::ToLowerCase(raw_unit);
+
+ std::optional<int64_t> value = StringUtils::StringToValue<int64_t>(number);
+ if (value == std::nullopt) {
+ return Status::Invalid(fmt::format(
+ "The value '{}' cannot be represented as 64bit number (numeric
overflow).", number));
+ }
+ PAIMON_ASSIGN_OR_RAISE(MemoryUnit parsed_unit, ParseUnit(unit));
+ int64_t multiplier = parsed_unit.GetMultiplier();
+
+ int64_t maximum = std::numeric_limits<int64_t>::max() / multiplier;
+ // check for overflow
+ if (value.value() > maximum) {
+ return Status::Invalid(fmt::format(
+ "The value '{}' cannot be represented as 64bit number of bytes
(numeric overflow).",
+ text));
+ }
+ return value.value() * multiplier;
+}
+
+Result<MemorySize::MemoryUnit> MemorySize::ParseUnit(const std::string& unit) {
+ if (MatchesAny(unit, Bytes())) {
+ return Bytes();
+ } else if (MatchesAny(unit, KiloBytes())) {
+ return KiloBytes();
+ } else if (MatchesAny(unit, MegaBytes())) {
+ return MegaBytes();
+ } else if (MatchesAny(unit, GigaBytes())) {
+ return GigaBytes();
+ } else if (MatchesAny(unit, TeraBytes())) {
+ return TeraBytes();
+ } else if (!unit.empty()) {
+ return Status::Invalid(
+ fmt::format("Memory size unit '{}' does not match any of the
recognized units", unit));
+ }
+ return Bytes();
+}
+
+bool MemorySize::MatchesAny(const std::string& str, const
MemorySize::MemoryUnit& unit) {
+ for (const auto& u : unit.GetUnits()) {
+ if (u == str) {
+ return true;
+ }
+ }
+ return false;
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/options/memory_size.h
b/src/paimon/common/options/memory_size.h
new file mode 100644
index 0000000..1908c31
--- /dev/null
+++ b/src/paimon/common/options/memory_size.h
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include "paimon/result.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+/// MemorySize is a representation of a number of bytes, viewable in different
units.
+///
+/// <h2>Parsing</h2>
+///
+/// The size can be parsed from a text expression. If the expression is a pure
number, the value
+/// will be interpreted as bytes.
+class PAIMON_EXPORT MemorySize {
+ public:
+ /// class which defines memory unit, mostly used to parse value from
configuration file.
+ ///
+ /// To make larger values more compact, the common size suffixes are
supported:
+ ///
+ /// <ul>
+ /// <li>1b or 1bytes (bytes)
+ /// <li>1k or 1kb or 1kibibytes (interpreted as kibibytes = 1024 bytes)
+ /// <li>1m or 1mb or 1mebibytes (interpreted as mebibytes = 1024 kibibytes)
+ /// <li>1g or 1gb or 1gibibytes (interpreted as gibibytes = 1024 mebibytes)
+ /// <li>1t or 1tb or 1tebibytes (interpreted as tebibytes = 1024 gibibytes)
+ /// </ul>
+ class MemoryUnit {
+ public:
+ MemoryUnit(const std::vector<std::string>& units, int64_t multiplier)
+ : units_(units), multiplier_(multiplier) {}
+
+ const std::vector<std::string>& GetUnits() const {
+ return units_;
+ }
+
+ int64_t GetMultiplier() const {
+ return multiplier_;
+ }
+
+ private:
+ std::vector<std::string> units_;
+ int64_t multiplier_;
+ };
+
+ static const MemoryUnit& Bytes();
+ static const MemoryUnit& KiloBytes();
+ static const MemoryUnit& MegaBytes();
+ static const MemoryUnit& GigaBytes();
+ static const MemoryUnit& TeraBytes();
+
+ static Result<int64_t> ParseBytes(const std::string& text);
+ static Result<MemoryUnit> ParseUnit(const std::string& unit);
+ static bool MatchesAny(const std::string& str, const
MemorySize::MemoryUnit& unit);
+
+ MemorySize() = delete;
+ ~MemorySize() = delete;
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/options/memory_size_test.cpp
b/src/paimon/common/options/memory_size_test.cpp
new file mode 100644
index 0000000..97096b8
--- /dev/null
+++ b/src/paimon/common/options/memory_size_test.cpp
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/options/memory_size.h"
+
+#include <limits>
+
+#include "gtest/gtest.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(MemorySizeTest, TestParseBytes) {
+ ASSERT_OK_AND_ASSIGN(int64_t size, MemorySize::ParseBytes("1024"));
+ ASSERT_EQ(size, 1024L);
+
+ ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024b"));
+ ASSERT_EQ(size, 1024);
+
+ ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024bytes"));
+ ASSERT_EQ(size, 1024L);
+
+ ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024k"));
+ ASSERT_EQ(size, 1048576L);
+
+ ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024kb"));
+ ASSERT_EQ(size, 1048576L);
+
+ ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024kibibytes"));
+ ASSERT_EQ(size, 1048576L);
+
+ ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024m"));
+ ASSERT_EQ(size, 1073741824L);
+
+ ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024mb"));
+ ASSERT_EQ(size, 1073741824L);
+
+ ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024mebibytes"));
+ ASSERT_EQ(size, 1073741824L);
+
+ ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024g"));
+ ASSERT_EQ(size, 1099511627776L);
+
+ ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024gb"));
+ ASSERT_EQ(size, 1099511627776L);
+
+ ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024gibibytes"));
+ ASSERT_EQ(size, 1099511627776L);
+
+ ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024t"));
+ ASSERT_EQ(size, 1125899906842624L);
+
+ ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024tb"));
+ ASSERT_EQ(size, 1125899906842624L);
+
+ ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024tebibytes"));
+ ASSERT_EQ(size, 1125899906842624L);
+}
+
+TEST(MemorySizeTest, TestParseBytesUpperCaseAndWithSpace) {
+ ASSERT_OK_AND_ASSIGN(int64_t size, MemorySize::ParseBytes("1024 B"));
+ ASSERT_EQ(size, 1024);
+
+ ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024Bytes "));
+ ASSERT_EQ(size, 1024L);
+
+ ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024 K"));
+ ASSERT_EQ(size, 1048576L);
+
+ ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024Kb "));
+ ASSERT_EQ(size, 1048576L);
+
+ ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024kiBIbyTes"));
+ ASSERT_EQ(size, 1048576L);
+
+ ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024M"));
+ ASSERT_EQ(size, 1073741824L);
+
+ ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024mB"));
+ ASSERT_EQ(size, 1073741824L);
+
+ ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024 meBIbyteS"));
+ ASSERT_EQ(size, 1073741824L);
+
+ ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024G"));
+ ASSERT_EQ(size, 1099511627776L);
+
+ ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024 GB"));
+ ASSERT_EQ(size, 1099511627776L);
+
+ ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024gIbibytes"));
+ ASSERT_EQ(size, 1099511627776L);
+
+ ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024 T"));
+ ASSERT_EQ(size, 1125899906842624L);
+
+ ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes(" 1024Tb "));
+ ASSERT_EQ(size, 1125899906842624L);
+
+ ASSERT_OK_AND_ASSIGN(size, MemorySize::ParseBytes("1024tebiBytes "));
+ ASSERT_EQ(size, 1125899906842624L);
+}
+
+TEST(MemorySizeTest, TestInvalidInput) {
+ ASSERT_NOK_WITH_MSG(MemorySize::ParseBytes(""),
+ "argument is an empty or whitespace-only string");
+ ASSERT_NOK_WITH_MSG(MemorySize::ParseBytes("1 SomeUnknownUnit"),
+ "does not match any of the recognized units");
+ ASSERT_NOK_WITH_MSG(MemorySize::ParseBytes("-1b"), "text does not start
with a number");
+
+ uint64_t overflow_num = std::numeric_limits<uint64_t>::max();
+ ASSERT_NOK_WITH_MSG(MemorySize::ParseBytes(std::to_string(overflow_num) +
" b"),
+ "cannot be represented as 64bit number (numeric
overflow)");
+ int64_t overflow_num_2 = std::numeric_limits<int64_t>::max();
+ ASSERT_NOK_WITH_MSG(MemorySize::ParseBytes(std::to_string(overflow_num_2)
+ " kb"),
+ "cannot be represented as 64bit number of bytes
(numeric overflow)");
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/options/time_duration.cpp
b/src/paimon/common/options/time_duration.cpp
new file mode 100644
index 0000000..f48b830
--- /dev/null
+++ b/src/paimon/common/options/time_duration.cpp
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/options/time_duration.h"
+
+#include <cstddef>
+#include <limits>
+#include <optional>
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+const TimeDuration::TimeUnit& TimeDuration::NanoSeconds() {
+ static const TimeUnit kNanoSeconds{{"ns", "nano", "nanos", "nanosecond",
"nanoseconds"},
+ 1000 * 1000L,
+ TimeUnit::CoefficientOperator::DIVIDE};
+ return kNanoSeconds;
+}
+
+const TimeDuration::TimeUnit& TimeDuration::MicroSeconds() {
+ static const TimeUnit kMicroSeconds{
+ {"us", "µs", "micro", "micros", "microsecond", "microseconds"},
+ 1000L,
+ TimeUnit::CoefficientOperator::DIVIDE};
+ return kMicroSeconds;
+}
+
+const TimeDuration::TimeUnit& TimeDuration::MilliSeconds() {
+ static const TimeUnit kMilliSeconds{{"ms", "milli", "millis",
"millisecond", "milliseconds"},
+ 1,
+
TimeUnit::CoefficientOperator::MULTIPLY};
+ return kMilliSeconds;
+}
+
+const TimeDuration::TimeUnit& TimeDuration::Seconds() {
+ static const TimeUnit kSeconds{
+ {"s", "sec", "secs", "second", "seconds"}, 1000L,
TimeUnit::CoefficientOperator::MULTIPLY};
+ return kSeconds;
+}
+
+const TimeDuration::TimeUnit& TimeDuration::Minutes() {
+ static const TimeUnit kMinutes{
+ {"min", "m", "minute", "minutes"}, 1000L * 60,
TimeUnit::CoefficientOperator::MULTIPLY};
+ return kMinutes;
+}
+
+const TimeDuration::TimeUnit& TimeDuration::Hours() {
+ static const TimeUnit kHours{
+ {"h", "hour", "hours"}, 1000L * 60 * 60,
TimeUnit::CoefficientOperator::MULTIPLY};
+ return kHours;
+}
+
+const TimeDuration::TimeUnit& TimeDuration::Days() {
+ static const TimeUnit kDays{
+ {"d", "day", "days"}, 1000L * 60 * 60 * 24,
TimeUnit::CoefficientOperator::MULTIPLY};
+ return kDays;
+}
+
+Result<int64_t> TimeDuration::Parse(const std::string& text) {
+ std::string trimmed = text;
+ StringUtils::Trim(&trimmed);
+ if (trimmed.empty()) {
+ return Status::Invalid("argument is an empty or whitespace-only
string");
+ }
+ const size_t len = trimmed.length();
+ size_t pos = 0;
+
+ char current;
+ while (pos < len) {
+ current = trimmed[pos];
+ if (current >= '0' && current <= '9') {
+ pos++;
+ } else {
+ break;
+ }
+ }
+ std::string number = trimmed.substr(0, pos);
+ if (number.empty()) {
+ return Status::Invalid("text does not start with a number");
+ }
+ std::string raw_unit = trimmed.substr(pos);
+ StringUtils::Trim(&raw_unit);
+ std::string unit = StringUtils::ToLowerCase(raw_unit);
+
+ std::optional<int64_t> value = StringUtils::StringToValue<int64_t>(number);
+ if (value == std::nullopt) {
+ return Status::Invalid(fmt::format(
+ "The value '{}' cannot be represented as 64bit number (numeric
overflow).", number));
+ }
+ PAIMON_ASSIGN_OR_RAISE(TimeUnit parsed_unit, ParseUnit(unit));
+ int64_t coefficient = parsed_unit.GetCoefficient();
+ TimeUnit::CoefficientOperator op = parsed_unit.GetCoefficientOperator();
+
+ int64_t maximum = std::numeric_limits<int64_t>::max() /
+ (op == TimeUnit::CoefficientOperator::DIVIDE ? 1 :
coefficient);
+ // check for overflow
+ if (value.value() > maximum) {
+ return Status::Invalid(
+ fmt::format("The value '{}' cannot be represented as 64bit number
of milliseconds "
+ "(numeric overflow).",
+ text));
+ }
+ if (op == TimeUnit::CoefficientOperator::MULTIPLY) {
+ return value.value() * coefficient;
+ } else {
+ return value.value() / coefficient;
+ }
+}
+
+Result<TimeDuration::TimeUnit> TimeDuration::ParseUnit(const std::string&
unit) {
+ if (MatchesAny(unit, NanoSeconds())) {
+ return NanoSeconds();
+ } else if (MatchesAny(unit, MicroSeconds())) {
+ return MicroSeconds();
+ } else if (MatchesAny(unit, MilliSeconds())) {
+ return MilliSeconds();
+ } else if (MatchesAny(unit, Seconds())) {
+ return Seconds();
+ } else if (MatchesAny(unit, Minutes())) {
+ return Minutes();
+ } else if (MatchesAny(unit, Hours())) {
+ return Hours();
+ } else if (MatchesAny(unit, Days())) {
+ return Days();
+ } else if (!unit.empty()) {
+ return Status::Invalid(fmt::format(
+ "Time duration unit '{}' does not match any of the recognized
units", unit));
+ }
+ return MilliSeconds();
+}
+
+bool TimeDuration::MatchesAny(const std::string& str, const
TimeDuration::TimeUnit& unit) {
+ for (const auto& u : unit.GetUnits()) {
+ if (u == str) {
+ return true;
+ }
+ }
+ return false;
+}
+
+} // namespace paimon
diff --git a/src/paimon/common/options/time_duration.h
b/src/paimon/common/options/time_duration.h
new file mode 100644
index 0000000..e1f93e0
--- /dev/null
+++ b/src/paimon/common/options/time_duration.h
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include "paimon/result.h"
+
+namespace paimon {
+
+/// TimeDuration is a representation of a number of millisecond, viewable in
different units.
+///
+/// <h2>Parsing</h2>
+///
+/// The size can be parsed from a text expression. If the expression is a pure
number, the value
+/// will be interpreted as millisecond.
+class TimeDuration {
+ public:
+ /// class which defines time duration, mostly used to parse value from
configuration file.
+ ///
+ /// To make larger values more compact, the common size suffixes are
supported:
+ ///
+ /// <ul>
+ /// <li>1ns or 1nano or 1nanos or 1nanosecond or 1nanoseconds
+ /// <li>1us or 1µs or 1micro or 1micros or 1microsecond or 1microseconds
+ /// <li>1ms or 1milli or 1millis or 1millisecond or 1milliseconds
+ /// <li>1s or 1sec or 1secs or 1second or 1seconds
+ /// <li>1min or 1m or 1minute or 1minutes
+ /// <li>1h or 1hour or 1hours
+ /// <li>1d or 1day or 1days
+ /// </ul>
+ class TimeUnit {
+ public:
+ enum class CoefficientOperator { MULTIPLY, DIVIDE };
+
+ TimeUnit(const std::vector<std::string>& units, int64_t coefficient,
CoefficientOperator op)
+ : units_(units), coefficient_(coefficient), op_(op) {}
+
+ const std::vector<std::string>& GetUnits() const {
+ return units_;
+ }
+
+ int64_t GetCoefficient() const {
+ return coefficient_;
+ }
+
+ CoefficientOperator GetCoefficientOperator() const {
+ return op_;
+ }
+
+ private:
+ std::vector<std::string> units_;
+ int64_t coefficient_;
+ CoefficientOperator op_;
+ };
+
+ static const TimeUnit& NanoSeconds();
+ static const TimeUnit& MicroSeconds();
+ static const TimeUnit& MilliSeconds();
+ static const TimeUnit& Seconds();
+ static const TimeUnit& Minutes();
+ static const TimeUnit& Hours();
+ static const TimeUnit& Days();
+
+ static Result<int64_t> Parse(const std::string& text);
+ static Result<TimeUnit> ParseUnit(const std::string& unit);
+ static bool MatchesAny(const std::string& str, const
TimeDuration::TimeUnit& unit);
+
+ TimeDuration() = delete;
+ ~TimeDuration() = delete;
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/options/time_duration_test.cpp
b/src/paimon/common/options/time_duration_test.cpp
new file mode 100644
index 0000000..1804481
--- /dev/null
+++ b/src/paimon/common/options/time_duration_test.cpp
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/options/time_duration.h"
+
+#include "gtest/gtest.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(TimeDurationTest, TestParseTime) {
+ ASSERT_OK_AND_ASSIGN(int64_t time, TimeDuration::Parse("1000 ns"));
+ ASSERT_EQ(time, 0L);
+ for (const auto& unit : {"ns", " ns", "nano", " nano", "nanos",
"nanosecond", "nanoseconds"}) {
+ ASSERT_OK_AND_ASSIGN(time,
TimeDuration::Parse(std::string("123456789") + unit));
+ ASSERT_EQ(time, 123L);
+ }
+ for (const auto& unit : {"us", " us", "µs", "micro", "micros",
"microsecond", "microseconds"}) {
+ ASSERT_OK_AND_ASSIGN(time,
TimeDuration::Parse(std::string("123456789") + unit));
+ ASSERT_EQ(time, 123456L);
+ }
+ for (const auto& unit : {"ms", "milli", " milli", "millis", "millisecond",
"milliseconds"}) {
+ ASSERT_OK_AND_ASSIGN(time, TimeDuration::Parse(std::string("1000") +
unit));
+ ASSERT_EQ(time, 1000L);
+ }
+ // without time unit, default time unit is milli
+ ASSERT_OK_AND_ASSIGN(time, TimeDuration::Parse("1000"));
+ ASSERT_EQ(time, 1000L);
+ for (const auto& unit : {"s", "sec", "secs", " second", "seconds"}) {
+ ASSERT_OK_AND_ASSIGN(time, TimeDuration::Parse(std::string("1000") +
unit));
+ ASSERT_EQ(time, 1000000L);
+ }
+ for (const auto& unit : {" min", "m", "minute", "minutes"}) {
+ ASSERT_OK_AND_ASSIGN(time, TimeDuration::Parse(std::string("1000") +
unit));
+ ASSERT_EQ(time, 60000000L);
+ }
+ for (const auto& unit : {"h", "hour", " hours"}) {
+ ASSERT_OK_AND_ASSIGN(time, TimeDuration::Parse(std::string("1000") +
unit));
+ ASSERT_EQ(time, 3600000000L);
+ }
+ for (const auto& unit : {"d", "day", " days"}) {
+ ASSERT_OK_AND_ASSIGN(time, TimeDuration::Parse(std::string("1000") +
unit));
+ ASSERT_EQ(time, 86400000000L);
+ }
+ // with invalid time unit
+ ASSERT_NOK_WITH_MSG(TimeDuration::Parse("1000ss"),
+ "Time duration unit 'ss' does not match any of the
recognized units");
+ // with invalid time duration
+ ASSERT_NOK_WITH_MSG(TimeDuration::Parse(""), "argument is an empty or
whitespace-only string");
+ ASSERT_NOK_WITH_MSG(TimeDuration::Parse(" "),
+ "argument is an empty or whitespace-only string");
+ ASSERT_NOK_WITH_MSG(TimeDuration::Parse("ns"), "text does not start with a
number");
+}
+
+TEST(TimeDurationTest, TestBoundaryCheck) {
+ ASSERT_OK_AND_ASSIGN(int64_t time, TimeDuration::Parse("1000 us"));
+ ASSERT_EQ(time, 1L);
+ ASSERT_OK(TimeDuration::Parse("106751991167d"));
+ ASSERT_NOK(TimeDuration::Parse("106751991168d"));
+ ASSERT_OK(TimeDuration::Parse("2562047788015h"));
+ ASSERT_NOK(TimeDuration::Parse("2562047788016h"));
+ ASSERT_OK(TimeDuration::Parse("153722867280912m"));
+ ASSERT_NOK(TimeDuration::Parse("153722867280913m"));
+ ASSERT_OK(TimeDuration::Parse("9223372036854775s"));
+ ASSERT_NOK(TimeDuration::Parse("9223372036854776s"));
+ ASSERT_OK(TimeDuration::Parse("9223372036854775807ms"));
+ ASSERT_NOK(TimeDuration::Parse("9223372036854775808ms"));
+}
+} // namespace paimon::test