This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 9fd9edb  feat: add data converter utils, field type utils, file type 
and decimal utils (#38)
9fd9edb is described below

commit 9fd9edbbd258bb0b7c720800826ac3fca4cf1d19
Author: lszskye <[email protected]>
AuthorDate: Thu Jun 4 11:01:31 2026 +0800

    feat: add data converter utils, field type utils, file type and decimal 
utils (#38)
---
 src/paimon/common/utils/data_converter_utils.h     | 215 +++++++++++++++++++++
 .../common/utils/data_converter_utils_test.cpp     | 145 ++++++++++++++
 src/paimon/common/utils/decimal_utils.cpp          | 103 ++++++++++
 src/paimon/common/utils/decimal_utils.h            |  52 +++++
 src/paimon/common/utils/decimal_utils_test.cpp     |  36 ++++
 src/paimon/common/utils/field_type_utils.h         | 131 +++++++++++++
 src/paimon/common/utils/field_type_utils_test.cpp  | 134 +++++++++++++
 src/paimon/common/utils/file_type.cpp              | 128 ++++++++++++
 src/paimon/common/utils/file_type.h                |  43 +++++
 src/paimon/common/utils/file_type_test.cpp         | 183 ++++++++++++++++++
 10 files changed, 1170 insertions(+)

diff --git a/src/paimon/common/utils/data_converter_utils.h 
b/src/paimon/common/utils/data_converter_utils.h
new file mode 100644
index 0000000..3ffeb98
--- /dev/null
+++ b/src/paimon/common/utils/data_converter_utils.h
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+#include <charconv>
+#include <cstdint>
+#include <functional>
+#include <iomanip>
+#include <memory>
+#include <optional>
+#include <sstream>
+#include <string>
+#include <utility>
+
+#include "arrow/type.h"
+#include "fmt/core.h"
+#include "fmt/format.h"
+#include "fmt/ranges.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/data/binary_row_writer.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/core/casting/date_to_string_cast_executor.h"
+#include "paimon/defs.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+class MemoryPool;
+
+#define RETURN_INVALID_WITH_FIELD_INFO(value, field_idx, value_str, type)      
              \
+    if ((value) == std::nullopt) {                                             
              \
+        return Status::Invalid(                                                
              \
+            fmt::format("cannot convert field idx {}, field value {} to type 
{}", field_idx, \
+                        value_str, type));                                     
              \
+    }
+
+class DataConverterUtils {
+ public:
+    DataConverterUtils() = delete;
+    ~DataConverterUtils() = delete;
+
+    using StrToBinaryRowConverter =
+        std::function<Status(const std::string&, int32_t, BinaryRowWriter*)>;
+    using BinaryRowFieldToStrConverter =
+        std::function<Result<std::string>(const BinaryRow&, int32_t)>;
+
+    static Result<StrToBinaryRowConverter> 
CreateDataToBinaryRowConverter(arrow::Type::type type,
+                                                                          
MemoryPool* pool) {
+        StrToBinaryRowConverter converter;
+        switch (type) {
+            case arrow::Type::BOOL:
+                converter = [](const std::string& value_str, int32_t field_idx,
+                               BinaryRowWriter* writer) {
+                    auto value = StringUtils::StringToValue<bool>(value_str);
+                    RETURN_INVALID_WITH_FIELD_INFO(value, field_idx, value_str,
+                                                   
arrow::internal::ToString(arrow::Type::BOOL));
+                    writer->WriteBoolean(field_idx, value.value());
+                    return Status::OK();
+                };
+                break;
+            case arrow::Type::INT8:
+                converter = [](const std::string& value_str, int32_t field_idx,
+                               BinaryRowWriter* writer) {
+                    auto value = StringUtils::StringToValue<int8_t>(value_str);
+                    RETURN_INVALID_WITH_FIELD_INFO(value, field_idx, value_str,
+                                                   
arrow::internal::ToString(arrow::Type::INT8));
+                    writer->WriteByte(field_idx, value.value());
+                    return Status::OK();
+                };
+                break;
+            case arrow::Type::INT16:
+                converter = [](const std::string& value_str, int32_t field_idx,
+                               BinaryRowWriter* writer) {
+                    auto value = 
StringUtils::StringToValue<int16_t>(value_str);
+                    RETURN_INVALID_WITH_FIELD_INFO(value, field_idx, value_str,
+                                                   
arrow::internal::ToString(arrow::Type::INT16));
+                    writer->WriteShort(field_idx, value.value());
+                    return Status::OK();
+                };
+                break;
+            case arrow::Type::INT32:
+                converter = [](const std::string& value_str, int32_t field_idx,
+                               BinaryRowWriter* writer) {
+                    auto value = 
StringUtils::StringToValue<int32_t>(value_str);
+                    RETURN_INVALID_WITH_FIELD_INFO(value, field_idx, value_str,
+                                                   
arrow::internal::ToString(arrow::Type::INT32));
+                    writer->WriteInt(field_idx, value.value());
+                    return Status::OK();
+                };
+                break;
+            case arrow::Type::INT64:
+                converter = [](const std::string& value_str, int32_t field_idx,
+                               BinaryRowWriter* writer) {
+                    auto value = 
StringUtils::StringToValue<int64_t>(value_str);
+                    RETURN_INVALID_WITH_FIELD_INFO(value, field_idx, value_str,
+                                                   
arrow::internal::ToString(arrow::Type::INT64));
+                    writer->WriteLong(field_idx, value.value());
+                    return Status::OK();
+                };
+                break;
+            case arrow::Type::STRING:
+                converter = [pool](const std::string& value_str, int32_t 
field_idx,
+                                   BinaryRowWriter* writer) {
+                    BinaryString value = BinaryString::FromString(value_str, 
pool);
+                    writer->WriteString(field_idx, value);
+                    return Status::OK();
+                };
+                break;
+            case arrow::Type::DATE32:
+                converter = [](const std::string& value_str, int32_t field_idx,
+                               BinaryRowWriter* writer) {
+                    PAIMON_ASSIGN_OR_RAISE(int32_t date_value,
+                                           
StringUtils::StringToDate(value_str));
+                    writer->WriteInt(field_idx, date_value);
+                    return Status::OK();
+                };
+                break;
+            default:
+                return Status::NotImplemented(
+                    fmt::format("Do not support type {} in partition binary 
row",
+                                arrow::internal::ToString(type)));
+        }
+        return converter;
+    }
+
+    static Result<BinaryRowFieldToStrConverter> 
CreateBinaryRowFieldToStringConverter(
+        arrow::Type::type type, bool legacy_partition_name_enabled) {
+        BinaryRowFieldToStrConverter converter;
+        switch (type) {
+            case arrow::Type::BOOL:
+                converter = [](const BinaryRow& row, int32_t field_idx) {
+                    bool data = row.GetBoolean(field_idx);
+                    std::string result = data ? "true" : "false";
+                    return result;
+                };
+                break;
+            case arrow::Type::INT8:
+                converter = [](const BinaryRow& row, int32_t field_idx) {
+                    auto data = static_cast<int8_t>(row.GetByte(field_idx));
+                    return std::to_string(data);
+                };
+                break;
+            case arrow::Type::INT16:
+                converter = [](const BinaryRow& row, int32_t field_idx) {
+                    auto data = row.GetShort(field_idx);
+                    return std::to_string(data);
+                };
+                break;
+            case arrow::Type::INT32:
+                converter = [](const BinaryRow& row, int32_t field_idx) {
+                    auto data = row.GetInt(field_idx);
+                    return std::to_string(data);
+                };
+                break;
+            case arrow::Type::INT64:
+                converter = [](const BinaryRow& row, int32_t field_idx) {
+                    auto data = row.GetLong(field_idx);
+                    return std::to_string(data);
+                };
+                break;
+            case arrow::Type::STRING:
+                converter = [](const BinaryRow& row, int32_t field_idx) {
+                    BinaryString data = row.GetString(field_idx);
+                    return data.ToString();
+                };
+                break;
+            case arrow::Type::DATE32: {
+                if (legacy_partition_name_enabled) {
+                    converter = [](const BinaryRow& row, int32_t field_idx) -> 
Result<std::string> {
+                        int32_t data = row.GetDate(field_idx);
+                        return std::to_string(data);
+                    };
+                } else {
+                    auto date_to_string_cast_executor =
+                        std::make_shared<DateToStringCastExecutor>();
+                    converter = [date_to_string_cast_executor](
+                                    const BinaryRow& row,
+                                    int32_t field_idx) -> Result<std::string> {
+                        int32_t data = row.GetDate(field_idx);
+                        PAIMON_ASSIGN_OR_RAISE(Literal literal,
+                                               
date_to_string_cast_executor->Cast(
+                                                   Literal(FieldType::DATE, 
data), arrow::utf8()));
+                        return literal.GetValue<std::string>();
+                    };
+                }
+                break;
+            }
+            default:
+                return Status::NotImplemented(
+                    fmt::format("Do not support arrow {} in partition binary 
row",
+                                arrow::internal::ToString(type)));
+        }
+        return converter;
+    }
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/utils/data_converter_utils_test.cpp 
b/src/paimon/common/utils/data_converter_utils_test.cpp
new file mode 100644
index 0000000..b007855
--- /dev/null
+++ b/src/paimon/common/utils/data_converter_utils_test.cpp
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/utils/data_converter_utils.h"
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "arrow/type_fwd.h"
+#include "gtest/gtest.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(DataConverterUtilsTest, 
TestDataToBinaryRowConverterWithLegacyPartitionName) {
+    auto pool = GetDefaultPool();
+    std::vector<std::pair<std::string, arrow::Type::type>> data = {
+        {"true", arrow::Type::BOOL},
+        {"10", arrow::Type::INT8},
+        {"-20", arrow::Type::INT8},
+        {"1556", arrow::Type::INT16},
+        {"-2556", arrow::Type::INT16},
+        {"348489", arrow::Type::INT32},
+        {"-448489", arrow::Type::INT32},
+        {"279039", arrow::Type::INT64},
+        {"1234567", arrow::Type::INT64},
+        {"abcde", arrow::Type::STRING},
+        {"这是一个很长很长的中文", arrow::Type::STRING},
+        {"10440", arrow::Type::DATE32}};
+
+    std::vector<DataConverterUtils::StrToBinaryRowConverter> converters;
+    std::vector<DataConverterUtils::BinaryRowFieldToStrConverter> reconverters;
+    for (const auto& [value, type] : data) {
+        ASSERT_OK_AND_ASSIGN(auto converter,
+                             
DataConverterUtils::CreateDataToBinaryRowConverter(type, pool.get()));
+        converters.emplace_back(std::move(converter));
+        ASSERT_OK_AND_ASSIGN(auto reconverter,
+                             
DataConverterUtils::CreateBinaryRowFieldToStringConverter(
+                                 type, 
/*legacy_partition_name_enabled=*/true));
+        reconverters.emplace_back(reconverter);
+    }
+    // test not implement type
+    
ASSERT_NOK(DataConverterUtils::CreateDataToBinaryRowConverter(arrow::Type::LIST,
 pool.get()));
+
+    BinaryRow row(data.size());
+    BinaryRowWriter writer(&row, 0, pool.get());
+    for (size_t idx = 0; idx < data.size(); idx++) {
+        ASSERT_OK(converters[idx](data[idx].first, idx, &writer));
+    }
+    // test invalid str
+    ASSERT_NOK(converters[0]("abc", /*idx=*/0, &writer));
+    writer.Complete();
+
+    ASSERT_EQ(data.size(), row.GetFieldCount());
+    ASSERT_EQ(true, row.GetBoolean(0));
+    ASSERT_EQ(10, row.GetByte(1));
+    ASSERT_EQ(-20, row.GetByte(2));
+    ASSERT_EQ(1556, row.GetShort(3));
+    ASSERT_EQ(-2556, row.GetShort(4));
+    ASSERT_EQ(348489, row.GetInt(5));
+    ASSERT_EQ(-448489, row.GetInt(6));
+    ASSERT_EQ(279039, row.GetLong(7));
+    ASSERT_EQ(1234567, row.GetLong(8));
+    ASSERT_EQ("abcde", row.GetString(9).ToString());
+    ASSERT_EQ("这是一个很长很长的中文", row.GetString(10).ToString());
+    ASSERT_EQ(10440, row.GetDate(11));
+
+    for (size_t idx = 0; idx < data.size(); idx++) {
+        ASSERT_OK_AND_ASSIGN(auto partition_field_str, reconverters[idx](row, 
idx));
+        ASSERT_EQ(data[idx].first, partition_field_str);
+    }
+}
+
+TEST(DataConverterUtilsTest, 
TestDataToBinaryRowConverterWithNoLegacyPartitionName) {
+    auto pool = GetDefaultPool();
+    std::vector<std::pair<std::string, arrow::Type::type>> data = {
+        {"true", arrow::Type::BOOL},
+        {"10", arrow::Type::INT8},
+        {"-20", arrow::Type::INT8},
+        {"1556", arrow::Type::INT16},
+        {"-2556", arrow::Type::INT16},
+        {"348489", arrow::Type::INT32},
+        {"-448489", arrow::Type::INT32},
+        {"279039", arrow::Type::INT64},
+        {"1234567", arrow::Type::INT64},
+        {"abcde", arrow::Type::STRING},
+        {"这是一个很长很长的中文", arrow::Type::STRING},
+        {"1998-08-02", arrow::Type::DATE32}};
+
+    std::vector<DataConverterUtils::StrToBinaryRowConverter> converters;
+    std::vector<DataConverterUtils::BinaryRowFieldToStrConverter> reconverters;
+    for (const auto& [value, type] : data) {
+        ASSERT_OK_AND_ASSIGN(auto converter,
+                             
DataConverterUtils::CreateDataToBinaryRowConverter(type, pool.get()));
+        converters.emplace_back(std::move(converter));
+        ASSERT_OK_AND_ASSIGN(auto reconverter,
+                             
DataConverterUtils::CreateBinaryRowFieldToStringConverter(
+                                 type, 
/*legacy_partition_name_enabled=*/false));
+        reconverters.emplace_back(reconverter);
+    }
+    BinaryRow row(data.size());
+    BinaryRowWriter writer(&row, 0, pool.get());
+    for (size_t idx = 0; idx < data.size(); idx++) {
+        ASSERT_OK(converters[idx](data[idx].first, idx, &writer));
+    }
+    writer.Complete();
+
+    ASSERT_EQ(data.size(), row.GetFieldCount());
+    ASSERT_EQ(true, row.GetBoolean(0));
+    ASSERT_EQ(10, row.GetByte(1));
+    ASSERT_EQ(-20, row.GetByte(2));
+    ASSERT_EQ(1556, row.GetShort(3));
+    ASSERT_EQ(-2556, row.GetShort(4));
+    ASSERT_EQ(348489, row.GetInt(5));
+    ASSERT_EQ(-448489, row.GetInt(6));
+    ASSERT_EQ(279039, row.GetLong(7));
+    ASSERT_EQ(1234567, row.GetLong(8));
+    ASSERT_EQ("abcde", row.GetString(9).ToString());
+    ASSERT_EQ("这是一个很长很长的中文", row.GetString(10).ToString());
+    ASSERT_EQ(10440, row.GetDate(11));
+
+    for (size_t idx = 0; idx < data.size(); idx++) {
+        ASSERT_OK_AND_ASSIGN(auto partition_field_str, reconverters[idx](row, 
idx));
+        ASSERT_EQ(data[idx].first, partition_field_str);
+    }
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/utils/decimal_utils.cpp 
b/src/paimon/common/utils/decimal_utils.cpp
new file mode 100644
index 0000000..ff4aa1b
--- /dev/null
+++ b/src/paimon/common/utils/decimal_utils.cpp
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/utils/decimal_utils.h"
+
+#include <algorithm>
+#include <cstddef>
+#include <utility>
+
+#include "arrow/api.h"
+#include "arrow/util/basic_decimal.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/decimal.h"
+#include "fmt/format.h"
+
+namespace paimon {
+Status DecimalUtils::CheckDecimalType(const arrow::DataType& type) {
+    auto* decimal_type = dynamic_cast<const arrow::Decimal128Type*>(&type);
+    if (!decimal_type) {
+        return Status::Invalid(fmt::format("Invalid decimal type: {}", 
type.ToString()));
+    }
+    if (decimal_type->precision() > Decimal::MAX_PRECISION ||
+        decimal_type->precision() < Decimal::MIN_PRECISION) {
+        return Status::Invalid(fmt::format("Invalid decimal type, precision 
must in range [{}, {}]",
+                                           Decimal::MIN_PRECISION, 
Decimal::MAX_PRECISION));
+    }
+    if (decimal_type->precision() < decimal_type->scale()) {
+        return Status::Invalid(fmt::format("Invalid decimal type {}, precision 
must >= scale",
+                                           decimal_type->ToString()));
+    }
+    return Status::OK();
+}
+
+std::optional<arrow::Decimal128> DecimalUtils::RescaleDecimalWithOverflowCheck(
+    const arrow::Decimal128& src_decimal, int32_t src_scale, int32_t 
target_precision,
+    int32_t target_scale) {
+    arrow::Decimal128 target_decimal = src_decimal;
+    if (src_scale < target_scale) {
+        int32_t delta_scale = target_scale - src_scale;
+        arrow::BasicDecimal128 min_bound = 
arrow::BasicDecimal128::GetMinSentinel();
+        arrow::BasicDecimal128 max_bound = 
arrow::BasicDecimal128::GetMaxSentinel();
+        min_bound /= arrow::BasicDecimal128::GetScaleMultiplier(delta_scale);
+        max_bound /= arrow::BasicDecimal128::GetScaleMultiplier(delta_scale);
+        // scaled_decimal may be overflow 128 bits
+        // noted that, arrow::Decimal.Rescale() is not safe
+        if (src_decimal > max_bound || src_decimal < min_bound) {
+            return std::nullopt;
+        }
+        target_decimal = src_decimal.IncreaseScaleBy(delta_scale);
+    } else if (src_scale > target_scale) {
+        target_decimal = src_decimal.ReduceScaleBy(src_scale - target_scale, 
/*round=*/true);
+    }
+    if (!target_decimal.FitsInPrecision(target_precision)) {
+        return std::nullopt;
+    }
+    return target_decimal;
+}
+
+Result<Decimal::int128_t> DecimalUtils::StrToInt128(const std::string& str) {
+    try {
+        Decimal::int128_t ret = 0;
+        size_t length = str.length();
+        if (length > 0) {
+            bool is_negative = str[0] == '-';
+            size_t posn = is_negative ? 1 : 0;
+            while (posn < length) {
+                size_t group = std::min(18ul, length - posn);
+                int64_t chunk = std::stoll(str.substr(posn, group));
+                int64_t multiple = 1;
+                for (size_t i = 0; i < group; ++i) {
+                    multiple *= 10;
+                }
+                ret *= multiple;
+                ret += chunk;
+                posn += group;
+            }
+            if (is_negative) {
+                ret = -ret;
+            }
+        }
+        return ret;
+    } catch (...) {
+        return Status::Invalid(fmt::format("invalid string: [{}], cannot 
convert to int128", str));
+    }
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/utils/decimal_utils.h 
b/src/paimon/common/utils/decimal_utils.h
new file mode 100644
index 0000000..e8b10fe
--- /dev/null
+++ b/src/paimon/common/utils/decimal_utils.h
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <optional>
+#include <string>
+
+#include "arrow/api.h"
+#include "arrow/util/decimal.h"
+#include "fmt/format.h"
+#include "paimon/data/decimal.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace arrow {
+class DataType;
+}  // namespace arrow
+
+namespace paimon {
+class DecimalUtils {
+ public:
+    DecimalUtils() = delete;
+    ~DecimalUtils() = delete;
+
+    static Status CheckDecimalType(const arrow::DataType& type);
+
+    static std::optional<arrow::Decimal128> RescaleDecimalWithOverflowCheck(
+        const arrow::Decimal128& src_decimal, int32_t src_scale, int32_t 
target_precision,
+        int32_t target_scale);
+
+    static Result<Decimal::int128_t> StrToInt128(const std::string& str);
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/utils/decimal_utils_test.cpp 
b/src/paimon/common/utils/decimal_utils_test.cpp
new file mode 100644
index 0000000..821829a
--- /dev/null
+++ b/src/paimon/common/utils/decimal_utils_test.cpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/utils/decimal_utils.h"
+
+#include <memory>
+
+#include "arrow/api.h"
+#include "gtest/gtest.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(DecimalUtilsTest, TestCheckDecimalType) {
+    ASSERT_NOK_WITH_MSG(DecimalUtils::CheckDecimalType(*arrow::int32()), 
"Invalid decimal type:");
+    ASSERT_NOK_WITH_MSG(DecimalUtils::CheckDecimalType(*arrow::decimal128(20, 
22)),
+                        "precision must >= scale");
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/utils/field_type_utils.h 
b/src/paimon/common/utils/field_type_utils.h
new file mode 100644
index 0000000..77da7db
--- /dev/null
+++ b/src/paimon/common/utils/field_type_utils.h
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <string>
+#include <utility>
+
+#include "arrow/api.h"
+#include "arrow/type_fwd.h"
+#include "fmt/format.h"
+#include "paimon/defs.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+class FieldTypeUtils {
+ public:
+    FieldTypeUtils() = delete;
+    ~FieldTypeUtils() = delete;
+
+    static bool IsIntegerNumeric(const FieldType& type) {
+        if (type == FieldType::TINYINT || type == FieldType::SMALLINT || type 
== FieldType::INT ||
+            type == FieldType::BIGINT) {
+            return true;
+        }
+        return false;
+    }
+    static bool IntegerScaleLargerThan(const FieldType& type, const FieldType& 
other_type) {
+        return (type == FieldType::SMALLINT && other_type == 
FieldType::TINYINT) ||
+               (type == FieldType::INT && other_type != FieldType::BIGINT) ||
+               (type == FieldType::BIGINT);
+    }
+
+    static Result<FieldType> ConvertToFieldType(const arrow::Type::type& 
arrow_type) {
+        switch (arrow_type) {
+            case arrow::Type::type::BOOL:
+                return FieldType::BOOLEAN;
+            case arrow::Type::type::INT8:
+                return FieldType::TINYINT;
+            case arrow::Type::type::INT16:
+                return FieldType::SMALLINT;
+            case arrow::Type::type::INT32:
+                return FieldType::INT;
+            case arrow::Type::type::INT64:
+                return FieldType::BIGINT;
+            case arrow::Type::type::FLOAT:
+                return FieldType::FLOAT;
+            case arrow::Type::type::DOUBLE:
+                return FieldType::DOUBLE;
+            case arrow::Type::type::STRING:
+                return FieldType::STRING;
+            case arrow::Type::type::BINARY:
+                return FieldType::BINARY;
+            case arrow::Type::type::LARGE_BINARY:
+                return FieldType::BLOB;  // TODO(xinyu): binary to large 
binary?
+            case arrow::Type::type::TIMESTAMP:
+                return FieldType::TIMESTAMP;
+            case arrow::Type::type::DECIMAL128:
+                return FieldType::DECIMAL;
+            case arrow::Type::type::DATE32:
+                return FieldType::DATE;
+            case arrow::Type::type::LIST:
+                return FieldType::ARRAY;
+            case arrow::Type::type::MAP:
+                return FieldType::MAP;
+            case arrow::Type::type::STRUCT:
+                return FieldType::STRUCT;
+            default:
+                return Status::Invalid(
+                    fmt::format("Not support arrow type {}", 
static_cast<int32_t>(arrow_type)));
+        }
+    }
+
+    static std::string FieldTypeToString(const FieldType& type) {
+        switch (type) {
+            case FieldType::BOOLEAN:
+                return "BOOLEAN";
+            case FieldType::TINYINT:
+                return "TINYINT";
+            case FieldType::SMALLINT:
+                return "SMALLINT";
+            case FieldType::INT:
+                return "INT";
+            case FieldType::BIGINT:
+                return "BIGINT";
+            case FieldType::FLOAT:
+                return "FLOAT";
+            case FieldType::DOUBLE:
+                return "DOUBLE";
+            case FieldType::STRING:
+                return "STRING";
+            case FieldType::BINARY:
+                return "BINARY";
+            case FieldType::BLOB:
+                return "BLOB";
+            case FieldType::TIMESTAMP:
+                return "TIMESTAMP";
+            case FieldType::DECIMAL:
+                return "DECIMAL";
+            case FieldType::DATE:
+                return "DATE";
+            case FieldType::ARRAY:
+                return "ARRAY";
+            case FieldType::MAP:
+                return "MAP";
+            case FieldType::STRUCT:
+                return "STRUCT";
+            default:
+                return "UNKNOWN, type id:" + 
std::to_string(static_cast<int32_t>(type));
+        }
+    }
+};
+}  // namespace paimon
diff --git a/src/paimon/common/utils/field_type_utils_test.cpp 
b/src/paimon/common/utils/field_type_utils_test.cpp
new file mode 100644
index 0000000..7c2b237
--- /dev/null
+++ b/src/paimon/common/utils/field_type_utils_test.cpp
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/utils/field_type_utils.h"
+
+#include "arrow/type_fwd.h"
+#include "gtest/gtest.h"
+#include "paimon/defs.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+// Test case: Check if Integer Numeric types are correctly identified
+TEST(FieldTypeUtilsTest, IsIntegerNumeric) {
+    ASSERT_TRUE(FieldTypeUtils::IsIntegerNumeric(FieldType::TINYINT));
+    ASSERT_TRUE(FieldTypeUtils::IsIntegerNumeric(FieldType::SMALLINT));
+    ASSERT_TRUE(FieldTypeUtils::IsIntegerNumeric(FieldType::INT));
+    ASSERT_TRUE(FieldTypeUtils::IsIntegerNumeric(FieldType::BIGINT));
+
+    // Non-integer types should return false
+    ASSERT_FALSE(FieldTypeUtils::IsIntegerNumeric(FieldType::FLOAT));
+    ASSERT_FALSE(FieldTypeUtils::IsIntegerNumeric(FieldType::STRING));
+    ASSERT_FALSE(FieldTypeUtils::IsIntegerNumeric(FieldType::TIMESTAMP));
+    ASSERT_FALSE(FieldTypeUtils::IsIntegerNumeric(FieldType::DECIMAL));
+}
+
+// Test case: Check IntegerScaleLargerThan function
+TEST(FieldTypeUtilsTest, IntegerScaleLargerThan) {
+    ASSERT_TRUE(FieldTypeUtils::IntegerScaleLargerThan(FieldType::SMALLINT, 
FieldType::TINYINT));
+    ASSERT_TRUE(FieldTypeUtils::IntegerScaleLargerThan(FieldType::INT, 
FieldType::SMALLINT));
+    ASSERT_TRUE(FieldTypeUtils::IntegerScaleLargerThan(FieldType::BIGINT, 
FieldType::INT));
+    ASSERT_TRUE(FieldTypeUtils::IntegerScaleLargerThan(FieldType::BIGINT, 
FieldType::SMALLINT));
+
+    // Should return false for other cases
+    ASSERT_FALSE(FieldTypeUtils::IntegerScaleLargerThan(FieldType::TINYINT, 
FieldType::SMALLINT));
+    ASSERT_FALSE(FieldTypeUtils::IntegerScaleLargerThan(FieldType::SMALLINT, 
FieldType::INT));
+    ASSERT_FALSE(FieldTypeUtils::IntegerScaleLargerThan(FieldType::INT, 
FieldType::BIGINT));
+}
+
+// Test case: Check ConvertToFieldType with various Arrow types
+TEST(FieldTypeUtilsTest, ConvertToFieldType) {
+    // Test valid Arrow types and their conversions
+    ASSERT_OK_AND_ASSIGN(auto result, 
FieldTypeUtils::ConvertToFieldType(arrow::Type::type::BOOL));
+    ASSERT_EQ(result, FieldType::BOOLEAN);
+
+    ASSERT_OK_AND_ASSIGN(result, 
FieldTypeUtils::ConvertToFieldType(arrow::Type::type::INT8));
+    ASSERT_EQ(result, FieldType::TINYINT);
+
+    ASSERT_OK_AND_ASSIGN(result, 
FieldTypeUtils::ConvertToFieldType(arrow::Type::type::INT16));
+    ASSERT_EQ(result, FieldType::SMALLINT);
+
+    ASSERT_OK_AND_ASSIGN(result, 
FieldTypeUtils::ConvertToFieldType(arrow::Type::type::INT32));
+    ASSERT_EQ(result, FieldType::INT);
+
+    ASSERT_OK_AND_ASSIGN(result, 
FieldTypeUtils::ConvertToFieldType(arrow::Type::type::INT64));
+    ASSERT_EQ(result, FieldType::BIGINT);
+
+    ASSERT_OK_AND_ASSIGN(result, 
FieldTypeUtils::ConvertToFieldType(arrow::Type::type::FLOAT));
+    ASSERT_EQ(result, FieldType::FLOAT);
+
+    ASSERT_OK_AND_ASSIGN(result, 
FieldTypeUtils::ConvertToFieldType(arrow::Type::type::DOUBLE));
+    ASSERT_EQ(result, FieldType::DOUBLE);
+
+    ASSERT_OK_AND_ASSIGN(result, 
FieldTypeUtils::ConvertToFieldType(arrow::Type::type::STRING));
+    ASSERT_EQ(result, FieldType::STRING);
+
+    ASSERT_OK_AND_ASSIGN(result, 
FieldTypeUtils::ConvertToFieldType(arrow::Type::type::BINARY));
+    ASSERT_EQ(result, FieldType::BINARY);
+
+    ASSERT_OK_AND_ASSIGN(result, 
FieldTypeUtils::ConvertToFieldType(arrow::Type::type::TIMESTAMP));
+    ASSERT_EQ(result, FieldType::TIMESTAMP);
+
+    ASSERT_OK_AND_ASSIGN(result, 
FieldTypeUtils::ConvertToFieldType(arrow::Type::type::DECIMAL128));
+    ASSERT_EQ(result, FieldType::DECIMAL);
+
+    ASSERT_OK_AND_ASSIGN(result, 
FieldTypeUtils::ConvertToFieldType(arrow::Type::type::DATE32));
+    ASSERT_EQ(result, FieldType::DATE);
+
+    ASSERT_OK_AND_ASSIGN(result, 
FieldTypeUtils::ConvertToFieldType(arrow::Type::type::LIST));
+    ASSERT_EQ(result, FieldType::ARRAY);
+
+    ASSERT_OK_AND_ASSIGN(result, 
FieldTypeUtils::ConvertToFieldType(arrow::Type::type::MAP));
+    ASSERT_EQ(result, FieldType::MAP);
+
+    ASSERT_OK_AND_ASSIGN(result, 
FieldTypeUtils::ConvertToFieldType(arrow::Type::type::STRUCT));
+    ASSERT_EQ(result, FieldType::STRUCT);
+
+    // Test unsupported Arrow type
+    ASSERT_NOK(FieldTypeUtils::ConvertToFieldType(
+        static_cast<arrow::Type::type>(999)));  // Invalid Arrow type
+}
+
+// Test case: FieldTypeToString
+TEST(FieldTypeUtilsTest, FieldTypeToString) {
+    // Check string representation of different FieldTypes
+    ASSERT_EQ(FieldTypeUtils::FieldTypeToString(FieldType::BOOLEAN), 
"BOOLEAN");
+    ASSERT_EQ(FieldTypeUtils::FieldTypeToString(FieldType::TINYINT), 
"TINYINT");
+    ASSERT_EQ(FieldTypeUtils::FieldTypeToString(FieldType::SMALLINT), 
"SMALLINT");
+    ASSERT_EQ(FieldTypeUtils::FieldTypeToString(FieldType::INT), "INT");
+    ASSERT_EQ(FieldTypeUtils::FieldTypeToString(FieldType::BIGINT), "BIGINT");
+    ASSERT_EQ(FieldTypeUtils::FieldTypeToString(FieldType::FLOAT), "FLOAT");
+    ASSERT_EQ(FieldTypeUtils::FieldTypeToString(FieldType::DOUBLE), "DOUBLE");
+    ASSERT_EQ(FieldTypeUtils::FieldTypeToString(FieldType::STRING), "STRING");
+    ASSERT_EQ(FieldTypeUtils::FieldTypeToString(FieldType::BINARY), "BINARY");
+    ASSERT_EQ(FieldTypeUtils::FieldTypeToString(FieldType::BLOB), "BLOB");
+    ASSERT_EQ(FieldTypeUtils::FieldTypeToString(FieldType::TIMESTAMP), 
"TIMESTAMP");
+    ASSERT_EQ(FieldTypeUtils::FieldTypeToString(FieldType::DECIMAL), 
"DECIMAL");
+    ASSERT_EQ(FieldTypeUtils::FieldTypeToString(FieldType::DATE), "DATE");
+    ASSERT_EQ(FieldTypeUtils::FieldTypeToString(FieldType::ARRAY), "ARRAY");
+    ASSERT_EQ(FieldTypeUtils::FieldTypeToString(FieldType::MAP), "MAP");
+    ASSERT_EQ(FieldTypeUtils::FieldTypeToString(FieldType::STRUCT), "STRUCT");
+
+    // Test UNKNOWN type
+    auto unknown_type = static_cast<FieldType>(999);
+    ASSERT_EQ(FieldTypeUtils::FieldTypeToString(unknown_type), "UNKNOWN, type 
id:999");
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/utils/file_type.cpp 
b/src/paimon/common/utils/file_type.cpp
new file mode 100644
index 0000000..6554e44
--- /dev/null
+++ b/src/paimon/common/utils/file_type.cpp
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/utils/file_type.h"
+
+#include "paimon/common/utils/path_util.h"
+#include "paimon/common/utils/string_utils.h"
+
+namespace paimon {
+
+namespace {
+
+constexpr char SNAPSHOT_PREFIX[] = "snapshot-";
+constexpr char SCHEMA_PREFIX[] = "schema-";
+constexpr char STATISTICS_PREFIX[] = "stat-";
+constexpr char TAG_PREFIX[] = "tag-";
+constexpr char CONSUMER_PREFIX[] = "consumer-";
+constexpr char SERVICE_PREFIX[] = "service-";
+constexpr char INDEX_PATH_SUFFIX[] = ".index";
+constexpr char INDEX_PREFIX[] = "index-";
+constexpr char CHANGELOG_PREFIX[] = "changelog-";
+
+constexpr char MANIFEST[] = "manifest";
+constexpr char CHANGELOG_DIR[] = "changelog";
+constexpr char GLOBAL_INDEX_INFIX[] = "global-index-";
+constexpr char TEMP_FILE_SUFFIX[] = ".tmp";
+
+std::string UnwrapTempFileName(const std::string& name) {
+    // format: .{originalName}.{UUID}.tmp
+    // suffix ".{UUID}.tmp" is 41 chars: 1(dot) + 36(UUID) + 4(.tmp)
+    // minimum total: 1(leading dot) + 1(name) + 41(suffix) = 43
+    if (name.size() < 43 || name[0] != '.' || !StringUtils::EndsWith(name, 
TEMP_FILE_SUFFIX)) {
+        return name;
+    }
+
+    size_t dot_before_uuid = name.size() - 41;
+    if (name[dot_before_uuid] != '.') {
+        return name;
+    }
+
+    return name.substr(1, dot_before_uuid - 1);
+}
+
+}  // namespace
+
+bool FileTypeUtils::IsIndex(FileType file_type) {
+    return file_type == FileType::kBucketIndex || file_type == 
FileType::kGlobalIndex ||
+           file_type == FileType::kFileIndex;
+}
+
+std::string FileTypeUtils::ToString(FileType file_type) {
+    switch (file_type) {
+        case FileType::kMeta:
+            return "meta";
+        case FileType::kData:
+            return "data";
+        case FileType::kBucketIndex:
+            return "bucket_index";
+        case FileType::kGlobalIndex:
+            return "global_index";
+        case FileType::kFileIndex:
+            return "file_index";
+    }
+    return "data";
+}
+
+FileType FileTypeUtils::Classify(const std::string& file_path) {
+    std::string name = PathUtil::GetName(file_path);
+    name = UnwrapTempFileName(name);
+
+    if (StringUtils::StartsWith(name, SNAPSHOT_PREFIX) ||
+        StringUtils::StartsWith(name, SCHEMA_PREFIX) ||
+        StringUtils::StartsWith(name, STATISTICS_PREFIX) ||
+        StringUtils::StartsWith(name, TAG_PREFIX) ||
+        StringUtils::StartsWith(name, CONSUMER_PREFIX) ||
+        StringUtils::StartsWith(name, SERVICE_PREFIX)) {
+        return FileType::kMeta;
+    }
+
+    if (StringUtils::EndsWith(name, INDEX_PATH_SUFFIX)) {
+        if (name.find(GLOBAL_INDEX_INFIX) != std::string::npos) {
+            return FileType::kGlobalIndex;
+        }
+        return FileType::kFileIndex;
+    }
+
+    if (name.find(MANIFEST) != std::string::npos) {
+        return FileType::kMeta;
+    }
+
+    if (StringUtils::StartsWith(name, INDEX_PREFIX)) {
+        return FileType::kBucketIndex;
+    }
+
+    if (name == "EARLIEST" || name == "LATEST") {
+        return FileType::kMeta;
+    }
+
+    if (StringUtils::EndsWith(name, "_SUCCESS")) {
+        return FileType::kMeta;
+    }
+
+    const std::string parent = PathUtil::GetParentDirPath(file_path);
+    if (StringUtils::StartsWith(name, CHANGELOG_PREFIX) &&
+        PathUtil::GetName(parent) == CHANGELOG_DIR) {
+        return FileType::kMeta;
+    }
+
+    return FileType::kData;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/utils/file_type.h 
b/src/paimon/common/utils/file_type.h
new file mode 100644
index 0000000..7c68baf
--- /dev/null
+++ b/src/paimon/common/utils/file_type.h
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <string>
+
+#include "paimon/visibility.h"
+
+namespace paimon {
+
+enum class FileType {
+    kMeta,
+    kData,
+    kBucketIndex,
+    kGlobalIndex,
+    kFileIndex,
+};
+
+class PAIMON_EXPORT FileTypeUtils {
+ public:
+    static bool IsIndex(FileType file_type);
+    static FileType Classify(const std::string& file_path);
+    static std::string ToString(FileType file_type);
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/utils/file_type_test.cpp 
b/src/paimon/common/utils/file_type_test.cpp
new file mode 100644
index 0000000..a358b7a
--- /dev/null
+++ b/src/paimon/common/utils/file_type_test.cpp
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/utils/file_type.h"
+
+#include "gtest/gtest.h"
+
+namespace paimon::test {
+
+TEST(FileTypeTest, TestIsIndex) {
+    ASSERT_TRUE(FileTypeUtils::IsIndex(FileType::kBucketIndex));
+    ASSERT_TRUE(FileTypeUtils::IsIndex(FileType::kGlobalIndex));
+    ASSERT_TRUE(FileTypeUtils::IsIndex(FileType::kFileIndex));
+    ASSERT_FALSE(FileTypeUtils::IsIndex(FileType::kMeta));
+    ASSERT_FALSE(FileTypeUtils::IsIndex(FileType::kData));
+}
+
+TEST(FileTypeTest, TestMetaPrefix) {
+    ASSERT_EQ(FileTypeUtils::Classify("dfs://cluster/db/snapshot/snapshot-1"), 
FileType::kMeta);
+    ASSERT_EQ(FileTypeUtils::Classify("dfs://cluster/db/schema/schema-2"), 
FileType::kMeta);
+    
ASSERT_EQ(FileTypeUtils::Classify("dfs://cluster/db/statistics/stat-a1b2c3d4-0"),
+              FileType::kMeta);
+    ASSERT_EQ(FileTypeUtils::Classify("dfs://cluster/db/tag/tag-3"), 
FileType::kMeta);
+    
ASSERT_EQ(FileTypeUtils::Classify("dfs://cluster/db/consumer/consumer-myGroup"),
+              FileType::kMeta);
+    ASSERT_EQ(FileTypeUtils::Classify("dfs://cluster/db/service/service-4"), 
FileType::kMeta);
+}
+
+TEST(FileTypeTest, TestIndexTypes) {
+    ASSERT_EQ(FileTypeUtils::Classify("dfs://cluster/db/index/bitmap.index"), 
FileType::kFileIndex);
+    
ASSERT_EQ(FileTypeUtils::Classify("dfs://cluster/db/index/btree-global-index-a1b2.index"),
+              FileType::kGlobalIndex);
+    
ASSERT_EQ(FileTypeUtils::Classify("dfs://cluster/db/index/bitmap-global-index-1.index"),
+              FileType::kGlobalIndex);
+    ASSERT_EQ(
+        
FileTypeUtils::Classify("dfs://cluster/db/index/lumina-vector-ann-global-index-a1b2.index"),
+        FileType::kGlobalIndex);
+    ASSERT_EQ(
+        
FileTypeUtils::Classify("dfs://cluster/db/index/tantivy-fulltext-global-index-a1b2.index"),
+        FileType::kGlobalIndex);
+    
ASSERT_EQ(FileTypeUtils::Classify("dfs://cluster/db/p=1/bucket-0/index-abcdef-1"),
+              FileType::kBucketIndex);
+}
+
+TEST(FileTypeTest, TestMetaSpecialNames) {
+    
ASSERT_EQ(FileTypeUtils::Classify("dfs://cluster/db/manifest/manifest-a1b2c3d4-0"),
+              FileType::kMeta);
+    
ASSERT_EQ(FileTypeUtils::Classify("dfs://cluster/db/manifest/manifest-list-1"),
+              FileType::kMeta);
+    
ASSERT_EQ(FileTypeUtils::Classify("dfs://cluster/db/manifest/index-manifest-a1b2c3d4-0"),
+              FileType::kMeta);
+    ASSERT_EQ(FileTypeUtils::Classify("dfs://cluster/db/_SUCCESS"), 
FileType::kMeta);
+    ASSERT_EQ(FileTypeUtils::Classify("dfs://cluster/db/part-1_SUCCESS"), 
FileType::kMeta);
+    ASSERT_EQ(FileTypeUtils::Classify("dfs://cluster/db/hint/EARLIEST"), 
FileType::kMeta);
+    ASSERT_EQ(FileTypeUtils::Classify("dfs://cluster/db/hint/LATEST"), 
FileType::kMeta);
+    
ASSERT_EQ(FileTypeUtils::Classify("dfs://cluster/db/changelog/changelog-123"), 
FileType::kMeta);
+}
+
+TEST(FileTypeTest, TestDefaultData) {
+    
ASSERT_EQ(FileTypeUtils::Classify("dfs://cluster/db/p=1/bucket-0/data-1.orc"), 
FileType::kData);
+    
ASSERT_EQ(FileTypeUtils::Classify("dfs://cluster/db/p=1/bucket-0/data-1.parquet"),
+              FileType::kData);
+    
ASSERT_EQ(FileTypeUtils::Classify("dfs://cluster/db/p=1/bucket-0/changelog-a1b2c3d4-0.orc"),
+              FileType::kData);
+    
ASSERT_EQ(FileTypeUtils::Classify("dfs://cluster/db/p=1/bucket-0/data-a1b2c3d4-0.blob"),
+              FileType::kData);
+    
ASSERT_EQ(FileTypeUtils::Classify("dfs://cluster/db/p=1/bucket-0/data-a1b2c3d4-0.vector.lance"),
+              FileType::kData);
+    ASSERT_EQ(FileTypeUtils::Classify("dfs://cluster/db/random/unknown.bin"), 
FileType::kData);
+}
+
+TEST(FileTypeTest, TestChangelogInAncestorPathNotMisclassified) {
+    const std::string root = "hdfs://cluster/changelog/warehouse/db.db/table";
+    ASSERT_EQ(FileTypeUtils::Classify(root + 
"/dt=2024-01-01/bucket-0/data-a1b2c3d4-0.orc"),
+              FileType::kData);
+    ASSERT_EQ(FileTypeUtils::Classify(root + 
"/dt=2024-01-01/bucket-0/changelog-a1b2c3d4-0.orc"),
+              FileType::kData);
+}
+
+TEST(FileTypeTest, TestBranchPaths) {
+    const std::string branch_root = 
"hdfs://cluster/warehouse/db.db/table/branch/branch-dev";
+    ASSERT_EQ(FileTypeUtils::Classify(branch_root + "/snapshot/snapshot-1"), 
FileType::kMeta);
+    ASSERT_EQ(FileTypeUtils::Classify(branch_root + "/schema/schema-0"), 
FileType::kMeta);
+    ASSERT_EQ(FileTypeUtils::Classify(branch_root + "/changelog/changelog-1"), 
FileType::kMeta);
+    ASSERT_EQ(FileTypeUtils::Classify(branch_root + "/index/index-a1b2c3d4-0"),
+              FileType::kBucketIndex);
+    ASSERT_EQ(FileTypeUtils::Classify(branch_root + 
"/index/btree-global-index-a1b2c3d4.index"),
+              FileType::kGlobalIndex);
+}
+
+TEST(FileTypeTest, TestTempWrappedName) {
+    ASSERT_EQ(FileTypeUtils::Classify(
+                  
"dfs://cluster/db/snapshot/.snapshot-1.12345678-1234-1234-1234-123456789abc.tmp"),
+              FileType::kMeta);
+
+    ASSERT_EQ(FileTypeUtils::Classify(
+                  
"dfs://cluster/db/schema/.schema-0.12345678-1234-1234-1234-123456789abc.tmp"),
+              FileType::kMeta);
+
+    ASSERT_EQ(FileTypeUtils::Classify(
+                  
"dfs://cluster/db/tag/.tag-myTag.12345678-1234-1234-1234-123456789abc.tmp"),
+              FileType::kMeta);
+
+    ASSERT_EQ(FileTypeUtils::Classify(
+                  
"dfs://cluster/db/consumer/.consumer-myGroup.12345678-1234-1234-1234-"
+                  "123456789abc.tmp"),
+              FileType::kMeta);
+
+    ASSERT_EQ(FileTypeUtils::Classify(
+                  
"dfs://cluster/db/service/.service-primary-key-lookup.12345678-1234-1234-1234-"
+                  "123456789abc.tmp"),
+              FileType::kMeta);
+
+    ASSERT_EQ(FileTypeUtils::Classify(
+                  
"dfs://cluster/db/snapshot/.EARLIEST.12345678-1234-1234-1234-123456789abc.tmp"),
+              FileType::kMeta);
+
+    ASSERT_EQ(FileTypeUtils::Classify(
+                  
"dfs://cluster/db/snapshot/.LATEST.12345678-1234-1234-1234-123456789abc.tmp"),
+              FileType::kMeta);
+
+    ASSERT_EQ(
+        FileTypeUtils::Classify(
+            
"dfs://cluster/db/p=1/bucket-0/._SUCCESS.12345678-1234-1234-1234-123456789abc.tmp"),
+        FileType::kMeta);
+
+    ASSERT_EQ(
+        FileTypeUtils::Classify(
+            
"dfs://cluster/db/changelog/.changelog-1.12345678-1234-1234-1234-123456789abc.tmp"),
+        FileType::kMeta);
+
+    ASSERT_EQ(FileTypeUtils::Classify(
+                  
"dfs://cluster/db/statistics/.stat-a1b2c3d4-0.12345678-1234-1234-1234-"
+                  "123456789abc.tmp"),
+              FileType::kMeta);
+
+    ASSERT_EQ(
+        FileTypeUtils::Classify(
+            
"dfs://cluster/db/p=1/bucket-0/.data-1.orc.12345678-1234-1234-1234-123456789abc.tmp"),
+        FileType::kData);
+
+    ASSERT_EQ(FileTypeUtils::Classify(
+                  
"dfs://cluster/db/index/.bitmap.index.12345678-1234-1234-1234-123456789abc.tmp"),
+              FileType::kFileIndex);
+
+    ASSERT_EQ(FileTypeUtils::Classify(
+                  
"dfs://cluster/db/index/.bitmap-global-index-1.index.12345678-1234-1234-1234-"
+                  "123456789abc.tmp"),
+              FileType::kGlobalIndex);
+}
+
+TEST(FileTypeTest, TestInvalidTempWrapperFallsBackToOriginalName) {
+    // No leading dot -> should not unwrap.
+    ASSERT_EQ(FileTypeUtils::Classify(
+                  
"dfs://cluster/db/snapshot/snapshot-1.12345678-1234-1234-1234-123456789abc.tmp"),
+              FileType::kMeta);
+
+    // No .tmp suffix -> should not unwrap.
+    ASSERT_EQ(FileTypeUtils::Classify(
+                  
"dfs://cluster/db/snapshot/.snapshot-1.12345678-1234-1234-1234-123456789abc"),
+              FileType::kData);
+
+    // Too short -> should not unwrap.
+    ASSERT_EQ(FileTypeUtils::Classify("dfs://cluster/db/snapshot/.x.tmp"), 
FileType::kData);
+}
+
+}  // namespace paimon::test

Reply via email to