This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 481bb3b  feat: add projected array, projected row, field comparator 
and partition computer (#32)
481bb3b is described below

commit 481bb3bbb7fe9d5c4c2c8c408568f67ad5c7d853
Author: lszskye <[email protected]>
AuthorDate: Mon Jun 1 14:44:11 2026 +0800

    feat: add projected array, projected row, field comparator and partition 
computer (#32)
---
 .../common/utils/binary_row_partition_computer.cpp | 164 +++++++++
 .../common/utils/binary_row_partition_computer.h   |  85 +++++
 .../utils/binary_row_partition_computer_test.cpp   | 329 ++++++++++++++++++
 src/paimon/common/utils/fields_comparator.cpp      | 189 +++++++++++
 src/paimon/common/utils/fields_comparator.h        | 104 ++++++
 src/paimon/common/utils/fields_comparator_test.cpp | 331 ++++++++++++++++++
 src/paimon/common/utils/internal_row_utils.h       | 108 ++++++
 .../common/utils/internal_row_utils_test.cpp       |  41 +++
 src/paimon/common/utils/projected_array.h          | 170 ++++++++++
 src/paimon/common/utils/projected_array_test.cpp   | 370 +++++++++++++++++++++
 src/paimon/common/utils/projected_row.h            | 173 ++++++++++
 src/paimon/common/utils/projected_row_test.cpp     | 128 +++++++
 src/paimon/common/utils/rapidjson_util.h           |  23 +-
 src/paimon/common/utils/rapidjson_util_test.cpp    |  23 +-
 14 files changed, 2212 insertions(+), 26 deletions(-)

diff --git a/src/paimon/common/utils/binary_row_partition_computer.cpp 
b/src/paimon/common/utils/binary_row_partition_computer.cpp
new file mode 100644
index 0000000..b9b0685
--- /dev/null
+++ b/src/paimon/common/utils/binary_row_partition_computer.cpp
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/utils/binary_row_partition_computer.h"
+
+#include <algorithm>
+#include <cstddef>
+
+#include "arrow/type.h"
+#include "fmt/format.h"
+#include "fmt/ranges.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/data/binary_row_writer.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/macros.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+class MemoryPool;
+
+BinaryRowPartitionComputer::BinaryRowPartitionComputer(
+    const std::vector<std::string>& partition_keys, const 
std::shared_ptr<arrow::Schema>& schema,
+    const std::string& default_part_value,
+    const std::vector<PartitionConverter>& partition_converters,
+    const std::shared_ptr<MemoryPool>& memory_pool)
+    : memory_pool_(memory_pool),
+      partition_keys_(partition_keys),
+      schema_(schema),
+      default_part_value_(default_part_value),
+      partition_converters_(partition_converters) {}
+
+Result<std::unique_ptr<BinaryRowPartitionComputer>> 
BinaryRowPartitionComputer::Create(
+    const std::vector<std::string>& partition_keys, const 
std::shared_ptr<arrow::Schema>& schema,
+    const std::string& default_part_value, bool legacy_partition_name_enabled,
+    const std::shared_ptr<MemoryPool>& memory_pool) {
+    if (PAIMON_UNLIKELY(schema == nullptr)) {
+        return Status::Invalid(
+            "create binary row partition computer failed, schema is null 
pointer.");
+    }
+    if (PAIMON_UNLIKELY(memory_pool == nullptr)) {
+        return Status::Invalid(
+            "create binary row partition computer failed, memory pool is null 
pointer.");
+    }
+    std::vector<PartitionConverter> partition_converters;
+    for (const auto& partition_key : partition_keys) {
+        PAIMON_ASSIGN_OR_RAISE(arrow::Type::type type_id,
+                               GetTypeFromArrowSchema(schema, partition_key));
+        PAIMON_ASSIGN_OR_RAISE(
+            DataConverterUtils::StrToBinaryRowConverter converter,
+            DataConverterUtils::CreateDataToBinaryRowConverter(type_id, 
memory_pool.get()));
+        
PAIMON_ASSIGN_OR_RAISE(DataConverterUtils::BinaryRowFieldToStrConverter 
reconverter,
+                               
DataConverterUtils::CreateBinaryRowFieldToStringConverter(
+                                   type_id, legacy_partition_name_enabled));
+        partition_converters.emplace_back(partition_key, std::move(converter),
+                                          std::move(reconverter));
+    }
+    return std::unique_ptr<BinaryRowPartitionComputer>(new 
BinaryRowPartitionComputer(
+        partition_keys, schema, default_part_value, partition_converters, 
memory_pool));
+}
+
+Result<BinaryRow> BinaryRowPartitionComputer::ToBinaryRow(
+    const std::map<std::string, std::string>& partition) const {
+    BinaryRow binary_row(partition_converters_.size());
+    BinaryRowWriter writer(&binary_row, /*initial_size=*/0, 
memory_pool_.get());
+    for (size_t field_idx = 0; field_idx < partition_converters_.size(); 
field_idx++) {
+        const auto& partition_extractor = partition_converters_[field_idx];
+        const auto& partition_key = partition_extractor.partition_key;
+        const auto& to_binary_row = partition_extractor.converter;
+        auto input_iter = partition.find(partition_key);
+        if (input_iter == partition.end()) {
+            return Status::Invalid(
+                fmt::format("can not find partition key '{}' in input 
partition '{}'",
+                            partition_key, partition));
+        }
+        const auto& value_str = input_iter->second;
+        if (value_str == default_part_value_) {
+            // TODO(yonghao.fyh): when support decimal/ timestamp in 
partition, use
+            // WriteTimestamp(null) for non compact precision
+            writer.SetNullAt(field_idx);
+        } else {
+            PAIMON_RETURN_NOT_OK(to_binary_row(value_str, field_idx, &writer));
+        }
+    }
+    writer.Complete();
+    return binary_row;
+}
+
+Result<std::vector<std::pair<std::string, std::string>>>
+BinaryRowPartitionComputer::GeneratePartitionVector(const BinaryRow& 
partition) const {
+    if (static_cast<size_t>(partition.GetFieldCount()) != 
partition_converters_.size()) {
+        return Status::Invalid(fmt::format(
+            "partition binary row field count {} not match with partition 
converter size {}",
+            partition.GetFieldCount(), partition_converters_.size()));
+    }
+    std::vector<std::pair<std::string, std::string>> result;
+    for (size_t field_idx = 0; field_idx < partition_converters_.size(); 
field_idx++) {
+        const auto& partition_extractor = partition_converters_[field_idx];
+        const auto& partition_key = partition_extractor.partition_key;
+        const auto& to_str = partition_extractor.reconverter;
+        if (partition.IsNullAt(field_idx)) {
+            result.emplace_back(partition_key, default_part_value_);
+        } else {
+            PAIMON_ASSIGN_OR_RAISE(std::string partition_field_str, 
to_str(partition, field_idx));
+            if (StringUtils::IsNullOrWhitespaceOnly(partition_field_str)) {
+                partition_field_str = default_part_value_;
+            }
+            result.emplace_back(partition_key, partition_field_str);
+        }
+    }
+    return result;
+}
+
+Result<arrow::Type::type> BinaryRowPartitionComputer::GetTypeFromArrowSchema(
+    const std::shared_ptr<arrow::Schema>& schema, const std::string& 
field_name) {
+    auto field = schema->GetFieldByName(field_name);
+    if (field == nullptr) {
+        return Status::Invalid(
+            fmt::format("field {} not in schema {}", field_name, 
schema->ToString()));
+    }
+    return field->type()->id();
+}
+
+Result<std::string> BinaryRowPartitionComputer::PartToSimpleString(
+    const std::shared_ptr<arrow::Schema>& partition_type, const BinaryRow& 
partition,
+    const std::string& delimiter, int32_t max_length) {
+    std::vector<DataConverterUtils::BinaryRowFieldToStrConverter> 
partition_converters;
+    partition_converters.reserve(partition_type->num_fields());
+    for (const auto& field : partition_type->fields()) {
+        
PAIMON_ASSIGN_OR_RAISE(DataConverterUtils::BinaryRowFieldToStrConverter 
converter,
+                               
DataConverterUtils::CreateBinaryRowFieldToStringConverter(
+                                   field->type()->id(), 
/*legacy_partition_name_enabled=*/true));
+        partition_converters.emplace_back(converter);
+    }
+    std::vector<std::string> partition_vec;
+    partition_vec.reserve(partition_converters.size());
+    for (size_t field_idx = 0; field_idx < partition_converters.size(); 
field_idx++) {
+        const auto& to_str = partition_converters[field_idx];
+        if (partition.IsNullAt(field_idx)) {
+            partition_vec.push_back("null");
+        } else {
+            PAIMON_ASSIGN_OR_RAISE(std::string partition_field_str, 
to_str(partition, field_idx));
+            partition_vec.push_back(partition_field_str);
+        }
+    }
+    return fmt::format("{}", fmt::join(partition_vec, delimiter)).substr(0, 
max_length);
+}
+}  // namespace paimon
diff --git a/src/paimon/common/utils/binary_row_partition_computer.h 
b/src/paimon/common/utils/binary_row_partition_computer.h
new file mode 100644
index 0000000..ae371bf
--- /dev/null
+++ b/src/paimon/common/utils/binary_row_partition_computer.h
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/type.h"
+#include "paimon/common/utils/data_converter_utils.h"
+#include "paimon/result.h"
+#include "paimon/type_fwd.h"
+
+namespace paimon {
+
+class BinaryRow;
+class MemoryPool;
+
+// TODO(yonghao.fyh): rethink naming of converter/reconverter
+struct PartitionConverter {
+    PartitionConverter(const std::string& _partition_key,
+                       typename DataConverterUtils::StrToBinaryRowConverter&& 
_converter,
+                       typename 
DataConverterUtils::BinaryRowFieldToStrConverter&& _reconverter)
+        : partition_key(_partition_key), converter(_converter), 
reconverter(_reconverter) {}
+    std::string partition_key;
+    typename DataConverterUtils::StrToBinaryRowConverter converter;
+    typename DataConverterUtils::BinaryRowFieldToStrConverter reconverter;
+};
+
+/// PartitionComputer for `BinaryRow`.
+class BinaryRowPartitionComputer {
+ public:
+    static Result<std::unique_ptr<BinaryRowPartitionComputer>> Create(
+        const std::vector<std::string>& partition_keys,
+        const std::shared_ptr<arrow::Schema>& schema, const std::string& 
default_part_value,
+        bool legacy_partition_name_enabled, const std::shared_ptr<MemoryPool>& 
memory_pool);
+
+    Result<BinaryRow> ToBinaryRow(const std::map<std::string, std::string>& 
partition) const;
+    Result<std::vector<std::pair<std::string, std::string>>> 
GeneratePartitionVector(
+        const BinaryRow& partition) const;
+    const std::vector<std::string>& GetPartitionKeys() const {
+        return partition_keys_;
+    }
+
+    static Result<std::string> PartToSimpleString(
+        const std::shared_ptr<arrow::Schema>& partition_type, const BinaryRow& 
partition,
+        const std::string& delimiter, int32_t max_length);
+
+ private:
+    BinaryRowPartitionComputer(const std::vector<std::string>& partition_keys,
+                               const std::shared_ptr<arrow::Schema>& schema,
+                               const std::string& default_part_value,
+                               const std::vector<PartitionConverter>& 
partition_converters,
+                               const std::shared_ptr<MemoryPool>& memory_pool);
+
+    static Result<arrow::Type::type> GetTypeFromArrowSchema(
+        const std::shared_ptr<arrow::Schema>& schema, const std::string& 
field_name);
+
+    std::shared_ptr<MemoryPool> memory_pool_;
+    std::vector<std::string> partition_keys_;
+    std::shared_ptr<arrow::Schema> schema_;
+    std::string default_part_value_;
+    std::vector<PartitionConverter> partition_converters_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/utils/binary_row_partition_computer_test.cpp 
b/src/paimon/common/utils/binary_row_partition_computer_test.cpp
new file mode 100644
index 0000000..119cb1b
--- /dev/null
+++ b/src/paimon/common/utils/binary_row_partition_computer_test.cpp
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/utils/binary_row_partition_computer.h"
+
+#include <cstdint>
+#include <limits>
+#include <string>
+#include <variant>
+
+#include "arrow/type.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/binary_row_generator.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(BinaryRowPartitionComputerTest, TestToAndFromBinaryRow) {
+    auto pool = GetDefaultPool();
+    arrow::FieldVector fields = {arrow::field("f0", arrow::boolean()),
+                                 arrow::field("f1", arrow::int8()),
+                                 arrow::field("f2", arrow::int8()),
+                                 arrow::field("f3", arrow::int16()),
+                                 arrow::field("f4", arrow::int16()),
+                                 arrow::field("f5", arrow::int32()),
+                                 arrow::field("f6", arrow::int32()),
+                                 arrow::field("f7", arrow::int64()),
+                                 arrow::field("f8", arrow::int64()),
+                                 arrow::field("f9", arrow::float32()),
+                                 arrow::field("f10", arrow::float64()),
+                                 arrow::field("f11", arrow::utf8()),
+                                 arrow::field("f12", arrow::utf8()),
+                                 arrow::field("f13", arrow::date32()),
+                                 arrow::field("non-partition-field", 
arrow::int32())};
+
+    auto schema = arrow::schema(fields);
+    std::vector<std::string> partition_keys = {"f0", "f2", "f1", "f3",  "f4",  
"f5",  "f6",
+                                               "f7", "f8", "f9", "f10", "f11", 
"f12", "f13"};
+    {
+        // simple case with legacy_partition_name_enabled = true
+        ASSERT_OK_AND_ASSIGN(
+            std::unique_ptr<BinaryRowPartitionComputer> computer,
+            BinaryRowPartitionComputer::Create(partition_keys, schema, 
"__DEFAULT_PARTITION__",
+                                               
/*legacy_partition_name_enabled=*/true, pool));
+        std::map<std::string, std::string> partition_map = {
+            {"f0", "true"},
+            {"f1", "10"},
+            {"f2", "-20"},
+            {"f3", "1556"},
+            {"f4", "-2556"},
+            {"f5", "348489"},
+            {"f6", "-448489"},
+            {"f7", "-9223372036854775808"},
+            {"f8", "182737474"},
+            {"f9", "0.334"},
+            {"f10", "467.66472"},
+            {"f11", "abcde"},
+            {"f12", "这是一个很长很长的中文"},
+            {"f13", "5"},
+        };
+        ASSERT_OK_AND_ASSIGN(BinaryRow row, 
computer->ToBinaryRow(partition_map));
+        ASSERT_EQ(14, row.GetFieldCount());
+        ASSERT_EQ(true, row.GetBoolean(0));
+        ASSERT_EQ(-20, row.GetByte(1));
+        ASSERT_EQ(10, row.GetByte(2));
+        ASSERT_EQ(1556, row.GetShort(3));
+        ASSERT_EQ(-2556, row.GetShort(4));
+        ASSERT_EQ(348489, row.GetInt(5));
+        ASSERT_EQ(-448489, row.GetInt(6));
+        ASSERT_EQ(std::numeric_limits<int64_t>::min(), row.GetLong(7));
+        ASSERT_EQ(182737474l, row.GetLong(8));
+        ASSERT_NEAR(0.334, row.GetFloat(9), 0.0000001);
+        ASSERT_NEAR(467.66472, row.GetDouble(10), 0.0000001);
+        ASSERT_EQ("abcde", row.GetString(11).ToString());
+        ASSERT_EQ("这是一个很长很长的中文", row.GetString(12).ToString());
+        ASSERT_EQ(5, row.GetDate(13));
+
+        std::vector<std::pair<std::string, std::string>> part_values;
+        ASSERT_OK_AND_ASSIGN(part_values, 
computer->GeneratePartitionVector(row));
+        ASSERT_EQ(14, part_values.size());
+        std::map<std::string, std::string> actual_part_values_map;
+        for (const auto& [key, value] : part_values) {
+            actual_part_values_map[key] = value;
+        }
+        ASSERT_EQ(actual_part_values_map, partition_map);
+    }
+    {
+        // simple case with legacy_partition_name_enabled = false
+        ASSERT_OK_AND_ASSIGN(
+            std::unique_ptr<BinaryRowPartitionComputer> computer,
+            BinaryRowPartitionComputer::Create(partition_keys, schema, 
"__DEFAULT_PARTITION__",
+                                               
/*legacy_partition_name_enabled=*/false, pool));
+        std::map<std::string, std::string> partition_map = {
+            {"f0", "true"},
+            {"f1", "10"},
+            {"f2", "-20"},
+            {"f3", "1556"},
+            {"f4", "-2556"},
+            {"f5", "348489"},
+            {"f6", "-448489"},
+            {"f7", "-9223372036854775808"},
+            {"f8", "182737474"},
+            {"f9", "0.334"},
+            {"f10", "467.66472"},
+            {"f11", "abcde"},
+            {"f12", "这是一个很长很长的中文"},
+            {"f13", "1970-01-06"},
+        };
+        ASSERT_OK_AND_ASSIGN(BinaryRow row, 
computer->ToBinaryRow(partition_map));
+        ASSERT_EQ(14, row.GetFieldCount());
+        ASSERT_EQ(true, row.GetBoolean(0));
+        ASSERT_EQ(-20, row.GetByte(1));
+        ASSERT_EQ(10, row.GetByte(2));
+        ASSERT_EQ(1556, row.GetShort(3));
+        ASSERT_EQ(-2556, row.GetShort(4));
+        ASSERT_EQ(348489, row.GetInt(5));
+        ASSERT_EQ(-448489, row.GetInt(6));
+        ASSERT_EQ(std::numeric_limits<int64_t>::min(), row.GetLong(7));
+        ASSERT_EQ(182737474l, row.GetLong(8));
+        ASSERT_NEAR(0.334, row.GetFloat(9), 0.0000001);
+        ASSERT_NEAR(467.66472, row.GetDouble(10), 0.0000001);
+        ASSERT_EQ("abcde", row.GetString(11).ToString());
+        ASSERT_EQ("这是一个很长很长的中文", row.GetString(12).ToString());
+        ASSERT_EQ(5, row.GetDate(13));
+
+        std::vector<std::pair<std::string, std::string>> part_values;
+        ASSERT_OK_AND_ASSIGN(part_values, 
computer->GeneratePartitionVector(row));
+        ASSERT_EQ(14, part_values.size());
+        std::map<std::string, std::string> actual_part_values_map;
+        for (const auto& [key, value] : part_values) {
+            actual_part_values_map[key] = value;
+        }
+        ASSERT_EQ(actual_part_values_map, partition_map);
+    }
+    {
+        // simple case with default partition value
+        ASSERT_OK_AND_ASSIGN(
+            std::unique_ptr<BinaryRowPartitionComputer> computer,
+            BinaryRowPartitionComputer::Create(partition_keys, schema, 
"__DEFAULT_PARTITION__",
+                                               
/*legacy_partition_name_enabled=*/true, pool));
+        std::map<std::string, std::string> partition_map = {
+            {"f0", "true"},
+            {"f1", "10"},
+            {"f2", "-20"},
+            {"f3", "1556"},
+            {"f4", "-2556"},
+            {"f5", "348489"},
+            {"f6", "-448489"},
+            {"f7", "-9223372036854775808"},
+            {"f8", "182737474"},
+            {"f9", "0.334"},
+            {"f10", "467.66472"},
+            {"f11", " "},
+            {"f12", "__DEFAULT_PARTITION__"},
+            {"f13", "5"},
+        };
+        ASSERT_OK_AND_ASSIGN(BinaryRow row, 
computer->ToBinaryRow(partition_map));
+        ASSERT_EQ(14, row.GetFieldCount());
+        ASSERT_EQ(true, row.GetBoolean(0));
+        ASSERT_EQ(-20, row.GetByte(1));
+        ASSERT_EQ(10, row.GetByte(2));
+        ASSERT_EQ(1556, row.GetShort(3));
+        ASSERT_EQ(-2556, row.GetShort(4));
+        ASSERT_EQ(348489, row.GetInt(5));
+        ASSERT_EQ(-448489, row.GetInt(6));
+        ASSERT_EQ(std::numeric_limits<int64_t>::min(), row.GetLong(7));
+        ASSERT_EQ(182737474l, row.GetLong(8));
+        ASSERT_NEAR(0.334, row.GetFloat(9), 0.0000001);
+        ASSERT_NEAR(467.66472, row.GetDouble(10), 0.0000001);
+        ASSERT_EQ(" ", row.GetString(11).ToString());
+        ASSERT_TRUE(row.IsNullAt(12));
+        ASSERT_EQ(5, row.GetInt(13));
+
+        std::vector<std::pair<std::string, std::string>> part_values;
+        ASSERT_OK_AND_ASSIGN(part_values, 
computer->GeneratePartitionVector(row));
+        ASSERT_EQ(14, part_values.size());
+        std::map<std::string, std::string> actual_part_values_map;
+        for (const auto& [key, value] : part_values) {
+            actual_part_values_map[key] = value;
+        }
+        std::map<std::string, std::string> expected_map = {
+            {"f0", "true"},
+            {"f1", "10"},
+            {"f2", "-20"},
+            {"f3", "1556"},
+            {"f4", "-2556"},
+            {"f5", "348489"},
+            {"f6", "-448489"},
+            {"f7", "-9223372036854775808"},
+            {"f8", "182737474"},
+            {"f9", "0.334"},
+            {"f10", "467.66472"},
+            {"f11", "__DEFAULT_PARTITION__"},
+            {"f12", "__DEFAULT_PARTITION__"},
+            {"f13", "5"},
+        };
+        ASSERT_EQ(actual_part_values_map, expected_map);
+    }
+    {
+        // test partition_str does not contain all partition keys, f4
+        ASSERT_OK_AND_ASSIGN(
+            std::unique_ptr<BinaryRowPartitionComputer> computer,
+            BinaryRowPartitionComputer::Create(partition_keys, schema, 
"__DEFAULT_PARTITION__",
+                                               
/*legacy_partition_name_enabled=*/true, pool));
+        std::map<std::string, std::string> partition_map = {{"f0", "true"},
+                                                            {"f1", "10"},
+                                                            {"f2", "-20"},
+                                                            {"f3", "1556"},
+                                                            {"f5", "348489"},
+                                                            {"f6", "-448489"},
+                                                            {"f7", 
"-9223372036854775808"},
+                                                            {"f8", 
"182737474"},
+                                                            {"f9", "0.334"},
+                                                            {"f10", 
"467.66472"},
+                                                            {"f11", "abcde"},
+                                                            {"f12", 
"这是一个很长很长的中文"}};
+
+        ASSERT_NOK_WITH_MSG(computer->ToBinaryRow(partition_map),
+                            "can not find partition key 'f4' in input 
partition");
+    }
+    {
+        // test partition_str mismatches schema, f6="abcd"
+        ASSERT_OK_AND_ASSIGN(
+            std::unique_ptr<BinaryRowPartitionComputer> computer,
+            BinaryRowPartitionComputer::Create(partition_keys, schema, 
"__DEFAULT_PARTITION__",
+                                               
/*legacy_partition_name_enabled=*/true, pool));
+
+        std::map<std::string, std::string> partition_map = {{"f0", "true"},
+                                                            {"f1", "10"},
+                                                            {"f2", "-20"},
+                                                            {"f3", "1556"},
+                                                            {"f4", "-2556"},
+                                                            {"f5", "348489"},
+                                                            {"f6", "abcd"},
+                                                            {"f7", 
"-9223372036854775808"},
+                                                            {"f8", 
"182737474"},
+                                                            {"f9", "0.334"},
+                                                            {"f10", 
"467.66472"},
+                                                            {"f11", "abcde"},
+                                                            {"f12", 
"这是一个很长很长的中文"}};
+        ASSERT_NOK_WITH_MSG(computer->ToBinaryRow(partition_map),
+                            "cannot convert field idx 6, field value abcd to 
type INT32");
+    }
+}
+
+TEST(BinaryRowPartitionComputerTest, TestNullOrWhitespaceOnlyStr) {
+    auto pool = GetDefaultPool();
+    arrow::FieldVector fields = {
+        arrow::field("f0", arrow::utf8()),
+        arrow::field("f1", arrow::utf8()),
+        arrow::field("f2", arrow::utf8()),
+    };
+
+    auto schema = arrow::schema(fields);
+    std::vector<std::string> partition_keys = {"f0", "f1", "f2"};
+    ASSERT_OK_AND_ASSIGN(
+        std::unique_ptr<BinaryRowPartitionComputer> computer,
+        BinaryRowPartitionComputer::Create(partition_keys, schema, 
"__DEFAULT_PARTITION__",
+                                           
/*legacy_partition_name_enabled=*/true, pool));
+
+    ASSERT_OK_AND_ASSIGN(auto partition_key_values,
+                         
computer->GeneratePartitionVector(BinaryRowGenerator::GenerateRow(
+                             {std::string(" "), std::string(""), 
std::string("ab ")}, pool.get())));
+    std::vector<std::pair<std::string, std::string>> expected = {
+        {"f0", "__DEFAULT_PARTITION__"}, {"f1", "__DEFAULT_PARTITION__"}, 
{"f2", "ab "}};
+    ASSERT_EQ(partition_key_values, expected);
+}
+
+TEST(BinaryRowPartitionComputerTest, TestPartToSimpleString) {
+    auto pool = GetDefaultPool();
+    {
+        auto schema = arrow::schema({});
+        auto partition = BinaryRow::EmptyRow();
+        ASSERT_OK_AND_ASSIGN(std::string ret, 
BinaryRowPartitionComputer::PartToSimpleString(
+                                                  schema, partition, "-", 30));
+        ASSERT_EQ(ret, "");
+    }
+    {
+        auto schema = arrow::schema({
+            arrow::field("f0", arrow::utf8()),
+            arrow::field("f1", arrow::int32()),
+        });
+        auto partition = 
BinaryRowGenerator::GenerateRow({std::string("20240731"), 10}, pool.get());
+        ASSERT_OK_AND_ASSIGN(std::string ret, 
BinaryRowPartitionComputer::PartToSimpleString(
+                                                  schema, partition, "-", 30));
+        ASSERT_EQ(ret, "20240731-10");
+    }
+    {
+        auto schema = arrow::schema({
+            arrow::field("f0", arrow::utf8()),
+            arrow::field("f1", arrow::int32()),
+        });
+        auto partition = BinaryRowGenerator::GenerateRow({NullType(), 10}, 
pool.get());
+        ASSERT_OK_AND_ASSIGN(std::string ret, 
BinaryRowPartitionComputer::PartToSimpleString(
+                                                  schema, partition, "-", 30));
+        ASSERT_EQ(ret, "null-10");
+    }
+    {
+        auto schema = arrow::schema({
+            arrow::field("f0", arrow::utf8()),
+            arrow::field("f1", arrow::int32()),
+        });
+        auto partition = 
BinaryRowGenerator::GenerateRow({std::string("20240731"), 10}, pool.get());
+        ASSERT_OK_AND_ASSIGN(std::string ret, 
BinaryRowPartitionComputer::PartToSimpleString(
+                                                  schema, partition, "-", 5));
+        ASSERT_EQ(ret, "20240");
+    }
+}
+}  // namespace paimon::test
diff --git a/src/paimon/common/utils/fields_comparator.cpp 
b/src/paimon/common/utils/fields_comparator.cpp
new file mode 100644
index 0000000..54a5cfd
--- /dev/null
+++ b/src/paimon/common/utils/fields_comparator.cpp
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/utils/fields_comparator.h"
+
+#include <cstddef>
+#include <string>
+
+#include "arrow/api.h"
+#include "arrow/util/checked_cast.h"
+#include "fmt/format.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/status.h"
+
+namespace paimon {
+Result<std::unique_ptr<FieldsComparator>> FieldsComparator::Create(
+    const std::vector<DataField>& input_data_field, bool is_ascending_order) {
+    std::vector<int32_t> sort_fields;
+    sort_fields.reserve(input_data_field.size());
+    for (int32_t i = 0; i < static_cast<int32_t>(input_data_field.size()); 
i++) {
+        sort_fields.push_back(i);
+    }
+    return Create(input_data_field, sort_fields, is_ascending_order);
+}
+
+Result<std::unique_ptr<FieldsComparator>> FieldsComparator::Create(
+    const std::vector<DataField>& input_data_field, const 
std::vector<int32_t>& sort_fields,
+    bool is_ascending_order) {
+    std::vector<FieldComparatorFunc> comparators;
+    comparators.reserve(sort_fields.size());
+    for (const auto& sort_field_idx : sort_fields) {
+        const auto& type = input_data_field[sort_field_idx].Type();
+        PAIMON_ASSIGN_OR_RAISE(FieldComparatorFunc cmp, 
CompareField(sort_field_idx, type));
+        comparators.emplace_back(cmp);
+    }
+    return std::unique_ptr<FieldsComparator>(
+        new FieldsComparator(is_ascending_order, sort_fields, 
std::move(comparators)));
+}
+
+int32_t FieldsComparator::CompareTo(const InternalRow& lhs, const InternalRow& 
rhs) const {
+    // in default comparator, null is first (not smallest)
+    int32_t null_is_last_ret = -1;
+    for (size_t i = 0; i < sort_fields_.size(); i++) {
+        bool lhs_null = lhs.IsNullAt(sort_fields_[i]);
+        bool rhs_null = rhs.IsNullAt(sort_fields_[i]);
+        if (lhs_null && rhs_null) {
+            // Continue to compare the next element
+        } else if (lhs_null) {
+            return null_is_last_ret;
+        } else if (rhs_null) {
+            return -null_is_last_ret;
+        } else {
+            int32_t comp = comparators_[i](lhs, rhs);
+            if (comp != 0) {
+                return is_ascending_order_ ? comp : -comp;
+            }
+        }
+    }
+    return 0;
+}
+
+Result<FieldsComparator::FieldComparatorFunc> FieldsComparator::CompareField(
+    int32_t field_idx, const std::shared_ptr<arrow::DataType>& input_type) {
+    arrow::Type::type type = input_type->id();
+    switch (type) {
+        case arrow::Type::type::BOOL:
+            return FieldsComparator::FieldComparatorFunc(
+                [field_idx](const InternalRow& lhs, const InternalRow& rhs) -> 
int32_t {
+                    bool lvalue = lhs.GetBoolean(field_idx);
+                    bool rvalue = rhs.GetBoolean(field_idx);
+                    return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1);
+                });
+        case arrow::Type::type::INT8:
+            return FieldsComparator::FieldComparatorFunc(
+                [field_idx](const InternalRow& lhs, const InternalRow& rhs) -> 
int32_t {
+                    int8_t lvalue = lhs.GetByte(field_idx);
+                    int8_t rvalue = rhs.GetByte(field_idx);
+                    return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1);
+                });
+        case arrow::Type::type::INT16:
+            return FieldsComparator::FieldComparatorFunc(
+                [field_idx](const InternalRow& lhs, const InternalRow& rhs) -> 
int32_t {
+                    int16_t lvalue = lhs.GetShort(field_idx);
+                    int16_t rvalue = rhs.GetShort(field_idx);
+                    return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1);
+                });
+        case arrow::Type::type::DATE32:
+            return FieldsComparator::FieldComparatorFunc(
+                [field_idx](const InternalRow& lhs, const InternalRow& rhs) -> 
int32_t {
+                    int32_t lvalue = lhs.GetDate(field_idx);
+                    int32_t rvalue = rhs.GetDate(field_idx);
+                    return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1);
+                });
+
+        case arrow::Type::type::INT32:
+            return FieldsComparator::FieldComparatorFunc(
+                [field_idx](const InternalRow& lhs, const InternalRow& rhs) -> 
int32_t {
+                    int32_t lvalue = lhs.GetInt(field_idx);
+                    int32_t rvalue = rhs.GetInt(field_idx);
+                    return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1);
+                });
+        case arrow::Type::type::INT64:
+            return FieldsComparator::FieldComparatorFunc(
+                [field_idx](const InternalRow& lhs, const InternalRow& rhs) -> 
int32_t {
+                    int64_t lvalue = lhs.GetLong(field_idx);
+                    int64_t rvalue = rhs.GetLong(field_idx);
+                    return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1);
+                });
+        case arrow::Type::type::FLOAT:
+            // TODO(xinyu.lxy):
+            // currently in java KeyComparatorSupplier: -inf < -0.0 == +0.0 < 
+inf = nan
+            // paimon-cpp: -inf < -0.0 == +0.0 < +inf and nan cannot be 
compared
+            return FieldsComparator::FieldComparatorFunc(
+                [field_idx](const InternalRow& lhs, const InternalRow& rhs) -> 
int32_t {
+                    float lvalue = lhs.GetFloat(field_idx);
+                    float rvalue = rhs.GetFloat(field_idx);
+                    return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1);
+                });
+        case arrow::Type::type::DOUBLE:
+            return FieldsComparator::FieldComparatorFunc(
+                [field_idx](const InternalRow& lhs, const InternalRow& rhs) -> 
int32_t {
+                    double lvalue = lhs.GetDouble(field_idx);
+                    double rvalue = rhs.GetDouble(field_idx);
+                    return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1);
+                });
+        case arrow::Type::type::STRING:
+        case arrow::Type::type::BINARY: {
+            return FieldsComparator::FieldComparatorFunc(
+                [field_idx](const InternalRow& lhs, const InternalRow& rhs) -> 
int32_t {
+                    auto lvalue = lhs.GetStringView(field_idx);
+                    auto rvalue = rhs.GetStringView(field_idx);
+                    int32_t cmp = lvalue.compare(rvalue);
+                    return cmp == 0 ? 0 : (cmp > 0 ? 1 : -1);
+                });
+        }
+        case arrow::Type::type::TIMESTAMP: {
+            auto timestamp_type =
+                
arrow::internal::checked_pointer_cast<arrow::TimestampType>(input_type);
+            assert(timestamp_type);
+            int32_t precision = 
DateTimeUtils::GetPrecisionFromType(timestamp_type);
+            return FieldsComparator::FieldComparatorFunc(
+                [field_idx, precision](const InternalRow& lhs, const 
InternalRow& rhs) -> int32_t {
+                    Timestamp lvalue = lhs.GetTimestamp(field_idx, precision);
+                    Timestamp rvalue = rhs.GetTimestamp(field_idx, precision);
+                    return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1);
+                });
+        }
+        case arrow::Type::type::DECIMAL: {
+            auto* decimal_type =
+                
arrow::internal::checked_cast<arrow::Decimal128Type*>(input_type.get());
+            assert(decimal_type);
+            auto precision = decimal_type->precision();
+            auto scale = decimal_type->scale();
+            return FieldsComparator::FieldComparatorFunc(
+                [field_idx, precision, scale](const InternalRow& lhs,
+                                              const InternalRow& rhs) -> 
int32_t {
+                    Decimal lvalue = lhs.GetDecimal(field_idx, precision, 
scale);
+                    Decimal rvalue = rhs.GetDecimal(field_idx, precision, 
scale);
+                    int32_t cmp = lvalue.CompareTo(rvalue);
+                    return cmp == 0 ? 0 : (cmp > 0 ? 1 : -1);
+                });
+        }
+        default:
+            return Status::NotImplemented(fmt::format("Do not support 
comparing {} type in idx {}",
+                                                      input_type->ToString(), 
field_idx));
+    }
+}
+}  // namespace paimon
diff --git a/src/paimon/common/utils/fields_comparator.h 
b/src/paimon/common/utils/fields_comparator.h
new file mode 100644
index 0000000..7dbeca1
--- /dev/null
+++ b/src/paimon/common/utils/fields_comparator.h
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "paimon/common/data/internal_row.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/result.h"
+
+namespace arrow {
+class DataType;
+}  // namespace arrow
+
+namespace paimon {
+class DataField;
+
+/// A `Comparator` that compares the file store key.
+class FieldsComparator {
+ public:
+    static Result<std::unique_ptr<FieldsComparator>> Create(
+        const std::vector<DataField>& input_data_field, bool 
is_ascending_order);
+
+    static Result<std::unique_ptr<FieldsComparator>> Create(
+        const std::vector<DataField>& input_data_field, const 
std::vector<int32_t>& sort_fields,
+        bool is_ascending_order);
+
+    int32_t CompareTo(const InternalRow& lhs, const InternalRow& rhs) const;
+
+    const std::vector<int32_t>& CompareFields() const {
+        return sort_fields_;
+    }
+
+    /// Java-compatible ordering for floating-point types:
+    /// -infinity < -0.0 < +0.0 < +infinity < NaN == NaN
+    /// for range index and sst key comparator
+    template <typename T>
+    static int32_t CompareFloatingPoint(T a, T b) {
+        const bool a_nan = std::isnan(a);
+        const bool b_nan = std::isnan(b);
+        if (a_nan && b_nan) {
+            return 0;
+        }
+        if (a_nan) {
+            return 1;
+        }
+        if (b_nan) {
+            return -1;
+        }
+        if (a == b) {
+            const bool a_neg = std::signbit(a);
+            const bool b_neg = std::signbit(b);
+            if (a_neg == b_neg) {
+                return 0;
+            }
+            return a_neg ? -1 : 1;  // -0.0 < +0.0
+        }
+        return a < b ? -1 : 1;
+    }
+
+ private:
+    using FieldComparatorFunc =
+        std::function<int32_t(const InternalRow& lhs, const InternalRow& rhs)>;
+
+    FieldsComparator(bool is_ascending_order, const std::vector<int32_t>& 
sort_fields,
+                     std::vector<FieldComparatorFunc>&& comparators)
+        : is_ascending_order_(is_ascending_order),
+          sort_fields_(sort_fields),
+          comparators_(std::move(comparators)) {
+        assert(comparators_.size() == sort_fields_.size());
+    }
+
+    static Result<FieldComparatorFunc> CompareField(
+        int32_t field_idx, const std::shared_ptr<arrow::DataType>& input_type);
+
+ private:
+    bool is_ascending_order_;
+    std::vector<int32_t> sort_fields_;
+    std::vector<FieldComparatorFunc> comparators_;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/utils/fields_comparator_test.cpp 
b/src/paimon/common/utils/fields_comparator_test.cpp
new file mode 100644
index 0000000..2f64f81
--- /dev/null
+++ b/src/paimon/common/utils/fields_comparator_test.cpp
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/utils/fields_comparator.h"
+
+#include <cstddef>
+#include <string>
+#include <variant>
+
+#include "arrow/api.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/data/data_define.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/common/utils/decimal_utils.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/binary_row_generator.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class FieldsComparatorTest : public ::testing::Test {
+ public:
+    void SetUp() override {}
+    void TearDown() override {}
+
+    void CheckResult(const InternalRow& row1, const InternalRow& row2,
+                     const std::vector<std::shared_ptr<arrow::DataType>>& 
input_types,
+                     const std::vector<int32_t>& sort_fields, bool has_null = 
false) {
+        std::vector<DataField> data_fields;
+        data_fields.reserve(input_types.size());
+        for (const auto& type : input_types) {
+            data_fields.emplace_back(/*id=*/0, arrow::field("fake_name", 
type));
+        }
+        ASSERT_OK_AND_ASSIGN(auto comp1, FieldsComparator::Create(data_fields, 
sort_fields,
+                                                                  
/*is_ascending_order=*/true));
+        ASSERT_EQ(-1, comp1->CompareTo(row1, row2));
+        ASSERT_EQ(1, comp1->CompareTo(row2, row1));
+        ASSERT_EQ(0, comp1->CompareTo(row1, row1));
+        ASSERT_EQ(0, comp1->CompareTo(row2, row2));
+
+        if (!has_null) {
+            ASSERT_OK_AND_ASSIGN(auto comp2,
+                                 FieldsComparator::Create(data_fields, 
sort_fields,
+                                                          
/*is_ascending_order=*/false));
+            ASSERT_EQ(1, comp2->CompareTo(row1, row2));
+            ASSERT_EQ(-1, comp2->CompareTo(row2, row1));
+            ASSERT_EQ(0, comp2->CompareTo(row1, row1));
+            ASSERT_EQ(0, comp2->CompareTo(row2, row2));
+        }
+    }
+
+    void CheckResult(const InternalRow& row1, const InternalRow& row2,
+                     const std::vector<std::shared_ptr<arrow::DataType>>& 
input_types,
+                     bool has_null = false) {
+        std::vector<int32_t> sort_fields;
+        for (size_t i = 0; i < input_types.size(); ++i) {
+            sort_fields.push_back(i);
+        }
+        CheckResult(row1, row2, input_types, sort_fields, has_null);
+    }
+};
+
+TEST_F(FieldsComparatorTest, TestSimple) {
+    auto pool = GetDefaultPool();
+    {
+        BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+            {false, static_cast<int8_t>(10), static_cast<int16_t>(100),
+             static_cast<int32_t>(100000)},
+            pool.get());
+        BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+            {true, static_cast<int8_t>(10), static_cast<int16_t>(100),
+             static_cast<int32_t>(100000)},
+            pool.get());
+        CheckResult(row1, row2, {arrow::boolean(), arrow::int8(), 
arrow::int16(), arrow::int32()});
+    }
+    {
+        BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+            {true, static_cast<int8_t>(10), static_cast<int16_t>(100),
+             static_cast<int32_t>(100000)},
+            pool.get());
+        BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+            {true, static_cast<int8_t>(20), static_cast<int16_t>(100),
+             static_cast<int32_t>(100000)},
+            pool.get());
+        CheckResult(row1, row2, {arrow::boolean(), arrow::int8(), 
arrow::int16(), arrow::int32()});
+    }
+    {
+        BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+            {true, static_cast<int8_t>(10), static_cast<int16_t>(100),
+             static_cast<int32_t>(100000)},
+            pool.get());
+        BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+            {true, static_cast<int8_t>(10), static_cast<int16_t>(100),
+             static_cast<int32_t>(200000)},
+            pool.get());
+        CheckResult(row1, row2, {arrow::boolean(), arrow::int8(), 
arrow::int16(), arrow::int32()});
+    }
+    {
+        BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+            {static_cast<int64_t>(9223372036854775800), 
static_cast<float>(100.1), 123.45},
+            pool.get());
+        BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+            {static_cast<int64_t>(9223372036854775801), 
static_cast<float>(100.1), 123.45},
+            pool.get());
+        CheckResult(row1, row2, {arrow::int64(), arrow::float32(), 
arrow::float64()});
+    }
+    {
+        BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+            {static_cast<int64_t>(9223372036854775800), 
static_cast<float>(100.1), 123.45},
+            pool.get());
+        BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+            {static_cast<int64_t>(9223372036854775800), 
static_cast<float>(100.4), 123.45},
+            pool.get());
+        CheckResult(row1, row2, {arrow::int64(), arrow::float32(), 
arrow::float64()});
+    }
+    {
+        BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+            {static_cast<int64_t>(9223372036854775800), 
static_cast<float>(100.1), 1.237E+2},
+            pool.get());
+        BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+            {static_cast<int64_t>(9223372036854775800), 
static_cast<float>(100.1), 1.238E+2},
+            pool.get());
+        CheckResult(row1, row2, {arrow::int64(), arrow::float32(), 
arrow::float64()});
+    }
+    {
+        auto bytes = std::make_shared<Bytes>("快乐每一天", pool.get());
+        BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+            {std::string("abandon"), bytes, 
TimestampType(Timestamp(1725875365442l, 120000), 9)},
+            pool.get());
+        BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+            {std::string("abandon-"), bytes, 
TimestampType(Timestamp(1725875365442l, 120000), 9)},
+            pool.get());
+        CheckResult(row1, row2,
+                    {arrow::utf8(), arrow::binary(), 
arrow::timestamp(arrow::TimeUnit::NANO)});
+    }
+    {
+        auto bytes1 = std::make_shared<Bytes>("快乐每一天", pool.get());
+        auto bytes2 = std::make_shared<Bytes>("快乐每一天!", pool.get());
+        BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+            {std::string("abandon"), bytes1, 
TimestampType(Timestamp(1725875365442l, 120000), 9)},
+            pool.get());
+        BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+            {std::string("abandon"), bytes2, 
TimestampType(Timestamp(1725875365442l, 120000), 9)},
+            pool.get());
+        CheckResult(row1, row2,
+                    {arrow::utf8(), arrow::binary(), 
arrow::timestamp(arrow::TimeUnit::NANO)});
+    }
+    {
+        auto bytes = std::make_shared<Bytes>("快乐每一天", pool.get());
+        BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+            {std::string("abandon"), bytes, 
TimestampType(Timestamp(1725875365442l, 120000), 9)},
+            pool.get());
+        BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+            {std::string("abandon"), bytes, 
TimestampType(Timestamp(1725875365442l, 120100), 9)},
+            pool.get());
+        CheckResult(row1, row2,
+                    {arrow::utf8(), arrow::binary(), 
arrow::timestamp(arrow::TimeUnit::NANO)});
+    }
+    {
+        BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+            {Decimal(38, 10, 
DecimalUtils::StrToInt128("12345678998765432145678").value()),
+             static_cast<int32_t>(10)},
+            pool.get());
+        BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+            {Decimal(38, 10, 
DecimalUtils::StrToInt128("12345678998765432145679").value()),
+             static_cast<int32_t>(10)},
+            pool.get());
+        CheckResult(row1, row2, {arrow::decimal128(38, 10), arrow::date32()});
+    }
+    {
+        BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+            {Decimal(38, 10, 
DecimalUtils::StrToInt128("12345678998765432145678").value()),
+             static_cast<int32_t>(10)},
+            pool.get());
+        BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+            {Decimal(38, 10, 
DecimalUtils::StrToInt128("12345678998765432145678").value()),
+             static_cast<int32_t>(20)},
+            pool.get());
+        CheckResult(row1, row2, {arrow::decimal128(38, 10), arrow::date32()});
+    }
+}
+
+TEST_F(FieldsComparatorTest, TestTimestampType) {
+    auto pool = GetDefaultPool();
+    // test ts with different precision
+    {
+        BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+            {TimestampType(Timestamp(1000l, 0), 0), 
TimestampType(Timestamp(1000l, 0), 3),
+             TimestampType(Timestamp(1000l, 1000), 6)},
+            pool.get());
+        BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+            {TimestampType(Timestamp(2000l, 0), 0), 
TimestampType(Timestamp(1000l, 0), 3),
+             TimestampType(Timestamp(1000l, 1000), 6)},
+            pool.get());
+        CheckResult(
+            row1, row2,
+            {arrow::timestamp(arrow::TimeUnit::SECOND), 
arrow::timestamp(arrow::TimeUnit::MILLI),
+             arrow::timestamp(arrow::TimeUnit::MICRO)});
+    }
+    {
+        BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+            {TimestampType(Timestamp(1000l, 0), 0), 
TimestampType(Timestamp(1000l, 0), 3),
+             TimestampType(Timestamp(1000l, 1000), 6)},
+            pool.get());
+        BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+            {TimestampType(Timestamp(1000l, 0), 0), 
TimestampType(Timestamp(1001l, 0), 3),
+             TimestampType(Timestamp(1000l, 1000), 6)},
+            pool.get());
+        CheckResult(
+            row1, row2,
+            {arrow::timestamp(arrow::TimeUnit::SECOND), 
arrow::timestamp(arrow::TimeUnit::MILLI),
+             arrow::timestamp(arrow::TimeUnit::MICRO)});
+    }
+    {
+        BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+            {TimestampType(Timestamp(1000l, 0), 0), 
TimestampType(Timestamp(1000l, 0), 3),
+             TimestampType(Timestamp(1000l, 1000), 6)},
+            pool.get());
+        BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+            {TimestampType(Timestamp(1000l, 0), 0), 
TimestampType(Timestamp(1000l, 0), 3),
+             TimestampType(Timestamp(1000l, 2000), 6)},
+            pool.get());
+        CheckResult(
+            row1, row2,
+            {arrow::timestamp(arrow::TimeUnit::SECOND), 
arrow::timestamp(arrow::TimeUnit::MILLI),
+             arrow::timestamp(arrow::TimeUnit::MICRO)});
+    }
+    {
+        auto timezone = DateTimeUtils::GetLocalTimezoneName();
+        BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+            {TimestampType(Timestamp(1000l, 0), 0), 
TimestampType(Timestamp(1000l, 0), 3),
+             TimestampType(Timestamp(1000l, 1000), 6)},
+            pool.get());
+        BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+            {TimestampType(Timestamp(1000l, 0), 0), 
TimestampType(Timestamp(1000l, 0), 3),
+             TimestampType(Timestamp(1000l, 2000), 6)},
+            pool.get());
+        CheckResult(row1, row2,
+                    {arrow::timestamp(arrow::TimeUnit::SECOND, timezone),
+                     arrow::timestamp(arrow::TimeUnit::MILLI, timezone),
+                     arrow::timestamp(arrow::TimeUnit::MICRO, timezone)});
+    }
+}
+TEST_F(FieldsComparatorTest, TestNull) {
+    auto pool = GetDefaultPool();
+    {
+        BinaryRow row1 = BinaryRowGenerator::GenerateRow({false, NullType()}, 
pool.get());
+        BinaryRow row2 = BinaryRowGenerator::GenerateRow({false, 21}, 
pool.get());
+        CheckResult(row1, row2, {arrow::boolean(), arrow::int32()}, 
/*has_null=*/true);
+    }
+    {
+        BinaryRow row1 = BinaryRowGenerator::GenerateRow({NullType(), false}, 
pool.get());
+        BinaryRow row2 = BinaryRowGenerator::GenerateRow({NullType(), true}, 
pool.get());
+        CheckResult(row1, row2, {arrow::int32(), arrow::boolean()}, 
/*has_null=*/true);
+    }
+    {
+        BinaryRow row1 = BinaryRowGenerator::GenerateRow({true, NullType()}, 
pool.get());
+        BinaryRow row2 = BinaryRowGenerator::GenerateRow({false, 21}, 
pool.get());
+        CheckResult(row1, row2, {arrow::boolean(), arrow::int32()}, {1, 0}, 
/*has_null=*/true);
+    }
+    {
+        BinaryRow row1 = BinaryRowGenerator::GenerateRow({21, false}, 
pool.get());
+        BinaryRow row2 = BinaryRowGenerator::GenerateRow({NullType(), true}, 
pool.get());
+        CheckResult(row1, row2, {arrow::int32(), arrow::boolean()}, {1, 0}, 
/*has_null=*/true);
+    }
+    {
+        BinaryRow row1 = BinaryRowGenerator::GenerateRow({false, NullType()}, 
pool.get());
+        BinaryRow row2 = BinaryRowGenerator::GenerateRow({true, NullType()}, 
pool.get());
+        CheckResult(row1, row2, {arrow::int32(), arrow::boolean()}, {1, 0}, 
/*has_null=*/true);
+    }
+}
+
+TEST_F(FieldsComparatorTest, TestWithSortFields) {
+    auto pool = GetDefaultPool();
+    {
+        BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+            {true, static_cast<int8_t>(10), static_cast<int16_t>(200),
+             static_cast<int32_t>(100000)},
+            pool.get());
+        BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+            {false, static_cast<int8_t>(5), static_cast<int16_t>(100),
+             static_cast<int32_t>(200000)},
+            pool.get());
+        CheckResult(row1, row2, {arrow::boolean(), arrow::int8(), 
arrow::int16(), arrow::int32()},
+                    {3, 2});
+    }
+    {
+        BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+            {true, static_cast<int8_t>(10), static_cast<int16_t>(100),
+             static_cast<int32_t>(100000)},
+            pool.get());
+        BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+            {false, static_cast<int8_t>(5), static_cast<int16_t>(200),
+             static_cast<int32_t>(100000)},
+            pool.get());
+        CheckResult(row1, row2, {arrow::boolean(), arrow::int8(), 
arrow::int16(), arrow::int32()},
+                    {3, 2});
+    }
+}
+
+TEST_F(FieldsComparatorTest, TestInvalidType) {
+    auto map_type = arrow::map(arrow::int8(), arrow::int16());
+    ASSERT_NOK_WITH_MSG(FieldsComparator::Create({DataField(0, 
arrow::field("f0", arrow::int32())),
+                                                  DataField(1, 
arrow::field("f1", map_type))},
+                                                 /*is_ascending_order=*/true),
+                        "Do not support comparing map<int8, int16> type in idx 
1");
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/utils/internal_row_utils.h 
b/src/paimon/common/utils/internal_row_utils.h
new file mode 100644
index 0000000..2a68724
--- /dev/null
+++ b/src/paimon/common/utils/internal_row_utils.h
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <optional>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/type.h"
+#include "paimon/common/data/binary_array.h"
+#include "paimon/common/data/binary_array_writer.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/common/data/internal_array.h"
+#include "paimon/common/data/internal_row.h"
+#include "paimon/result.h"
+
+namespace paimon {
+class MemoryPool;
+
+class InternalRowUtils {
+ public:
+    InternalRowUtils() = delete;
+    ~InternalRowUtils() = delete;
+
+    static std::vector<std::optional<std::string>> FromStringArrayData(const 
InternalArray* data) {
+        std::vector<std::optional<std::string>> strs;
+        strs.reserve(data->Size());
+        for (auto i = 0; i < data->Size(); i++) {
+            if (!data->IsNullAt(i)) {
+                strs.emplace_back(data->GetString(i).ToString());
+            } else {
+                strs.emplace_back(std::nullopt);
+            }
+        }
+        return strs;
+    }
+
+    static std::vector<std::string> FromNotNullStringArrayData(const 
InternalArray* data) {
+        std::vector<std::string> strs;
+        strs.reserve(data->Size());
+        for (auto i = 0; i < data->Size(); i++) {
+            strs.push_back(data->GetString(i).ToString());
+        }
+        return strs;
+    }
+
+    static BinaryArray ToStringArrayData(const 
std::vector<std::optional<std::string>>& strs,
+                                         const std::shared_ptr<MemoryPool>& 
memory_pool) {
+        BinaryArray array;
+        BinaryArrayWriter writer(&array, strs.size(), 8, memory_pool.get());
+        for (size_t i = 0; i < strs.size(); i++) {
+            if (strs[i] == std::nullopt) {
+                writer.SetNullAt(i);
+            } else {
+                writer.WriteString(i, 
BinaryString::FromString(strs[i].value(), memory_pool.get()));
+            }
+        }
+        writer.Complete();
+        return array;
+    }
+
+    static BinaryArray ToNotNullStringArrayData(const 
std::vector<std::string>& strs,
+                                                const 
std::shared_ptr<MemoryPool>& memory_pool) {
+        BinaryArray array;
+        BinaryArrayWriter writer(&array, strs.size(), 8, memory_pool.get());
+        for (size_t i = 0; i < strs.size(); i++) {
+            writer.WriteString(i, BinaryString::FromString(strs[i], 
memory_pool.get()));
+        }
+        writer.Complete();
+        return array;
+    }
+
+    static Result<std::vector<InternalRow::FieldGetterFunc>> 
CreateFieldGetters(
+        const std::shared_ptr<arrow::Schema>& value_schema, bool use_view) {
+        std::vector<InternalRow::FieldGetterFunc> getters;
+        getters.reserve(value_schema->num_fields());
+        for (int32_t i = 0; i < value_schema->num_fields(); i++) {
+            const auto& field_type = value_schema->field(i)->type();
+            PAIMON_ASSIGN_OR_RAISE(InternalRow::FieldGetterFunc getter,
+                                   InternalRow::CreateFieldGetter(i, 
field_type, use_view));
+            getters.emplace_back(getter);
+        }
+        return getters;
+    }
+};
+}  // namespace paimon
diff --git a/src/paimon/common/utils/internal_row_utils_test.cpp 
b/src/paimon/common/utils/internal_row_utils_test.cpp
new file mode 100644
index 0000000..71c97ee
--- /dev/null
+++ b/src/paimon/common/utils/internal_row_utils_test.cpp
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/utils/internal_row_utils.h"
+
+#include "gtest/gtest.h"
+#include "paimon/memory/memory_pool.h"
+
+namespace paimon::test {
+TEST(InternalRowUtilsTest, TestToAndFromNotNullStringArrayData) {
+    auto pool = GetDefaultPool();
+    std::vector<std::string> strs = {"aa", "bb", "cc", "dd"};
+    auto array = InternalRowUtils::ToNotNullStringArrayData(strs, pool);
+    auto results = InternalRowUtils::FromNotNullStringArrayData(&array);
+    ASSERT_EQ(strs, results);
+}
+
+TEST(InternalRowUtilsTest, TestToAndFromStringArrayData) {
+    auto pool = GetDefaultPool();
+    std::vector<std::optional<std::string>> strs = {"aa", std::nullopt, "bb", 
"cc", "dd"};
+    auto array = InternalRowUtils::ToStringArrayData(strs, pool);
+    auto results = InternalRowUtils::FromStringArrayData(&array);
+    ASSERT_EQ(strs, results);
+}
+}  // namespace paimon::test
diff --git a/src/paimon/common/utils/projected_array.h 
b/src/paimon/common/utils/projected_array.h
new file mode 100644
index 0000000..f157a82
--- /dev/null
+++ b/src/paimon/common/utils/projected_array.h
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <string_view>
+#include <vector>
+
+#include "paimon/common/data/binary_string.h"
+#include "paimon/common/data/internal_array.h"
+#include "paimon/common/data/internal_map.h"
+#include "paimon/common/data/internal_row.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+class Bytes;
+class InternalMap;
+class InternalRow;
+
+/// An implementation of `InternalArray` which provides a projected view of 
the underlying
+/// `InternalArray`.
+class ProjectedArray : public InternalArray {
+ public:
+    ProjectedArray(const std::shared_ptr<InternalArray>& array, const 
std::vector<int32_t>& mapping)
+        : array_(array), mapping_(mapping) {
+        assert(array_);
+    }
+
+    int32_t Size() const override {
+        return mapping_.size();
+    }
+
+    bool IsNullAt(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        if (mapping_[pos] < 0) {
+            return true;
+        }
+        return array_->IsNullAt(mapping_[pos]);
+    }
+
+    bool GetBoolean(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return array_->GetBoolean(mapping_[pos]);
+    }
+
+    char GetByte(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return array_->GetByte(mapping_[pos]);
+    }
+
+    int16_t GetShort(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return array_->GetShort(mapping_[pos]);
+    }
+
+    int32_t GetInt(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return array_->GetInt(mapping_[pos]);
+    }
+
+    int32_t GetDate(int32_t pos) const override {
+        return GetInt(pos);
+    }
+
+    int64_t GetLong(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return array_->GetLong(mapping_[pos]);
+    }
+
+    float GetFloat(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return array_->GetFloat(mapping_[pos]);
+    }
+
+    double GetDouble(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return array_->GetDouble(mapping_[pos]);
+    }
+
+    BinaryString GetString(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return array_->GetString(mapping_[pos]);
+    }
+
+    std::string_view GetStringView(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return array_->GetStringView(mapping_[pos]);
+    }
+
+    Decimal GetDecimal(int32_t pos, int32_t precision, int32_t scale) const 
override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return array_->GetDecimal(mapping_[pos], precision, scale);
+    }
+
+    Timestamp GetTimestamp(int32_t pos, int32_t precision) const override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return array_->GetTimestamp(mapping_[pos], precision);
+    }
+
+    std::shared_ptr<Bytes> GetBinary(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return array_->GetBinary(mapping_[pos]);
+    }
+
+    std::shared_ptr<InternalArray> GetArray(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return array_->GetArray(mapping_[pos]);
+    }
+
+    std::shared_ptr<InternalMap> GetMap(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return array_->GetMap(mapping_[pos]);
+    }
+
+    std::shared_ptr<InternalRow> GetRow(int32_t pos, int32_t num_fields) const 
override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return array_->GetRow(mapping_[pos], num_fields);
+    }
+
+    Result<std::vector<char>> ToBooleanArray() const override {
+        return Status::Invalid("projected array do not support convert to 
boolean array");
+    }
+    Result<std::vector<char>> ToByteArray() const override {
+        return Status::Invalid("projected array do not support convert to byte 
array");
+    }
+    Result<std::vector<int16_t>> ToShortArray() const override {
+        return Status::Invalid("projected array do not support convert to 
short array");
+    }
+    Result<std::vector<int32_t>> ToIntArray() const override {
+        return Status::Invalid("projected array do not support convert to int 
array");
+    }
+    Result<std::vector<int64_t>> ToLongArray() const override {
+        return Status::Invalid("projected array do not support convert to long 
array");
+    }
+    Result<std::vector<float>> ToFloatArray() const override {
+        return Status::Invalid("projected array do not support convert to 
float array");
+    }
+    Result<std::vector<double>> ToDoubleArray() const override {
+        return Status::Invalid("projected array do not support convert to 
double array");
+    }
+
+ private:
+    std::shared_ptr<InternalArray> array_;
+    std::vector<int32_t> mapping_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/utils/projected_array_test.cpp 
b/src/paimon/common/utils/projected_array_test.cpp
new file mode 100644
index 0000000..b0dcdcf
--- /dev/null
+++ b/src/paimon/common/utils/projected_array_test.cpp
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/utils/projected_array.h"
+
+#include <algorithm>
+#include <optional>
+#include <string>
+
+#include "gtest/gtest.h"
+#include "paimon/common/data/binary_array.h"
+#include "paimon/common/data/binary_array_writer.h"
+#include "paimon/common/data/binary_map.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/binary_row_generator.h"
+
+namespace paimon::test {
+TEST(ProjectedArrayTest, TestSimple) {
+    auto pool = GetDefaultPool();
+    {
+        std::vector<bool> arr = {true, false};
+        auto array = std::make_shared<BinaryArray>();
+        BinaryArrayWriter writer =
+            BinaryArrayWriter(array.get(), arr.size(), sizeof(bool), 
pool.get());
+        for (size_t i = 0; i < arr.size(); i++) {
+            writer.WriteBoolean(i, arr[i]);
+        }
+        writer.Complete();
+
+        std::vector<int32_t> mapping = {1, 0, -1};
+        ProjectedArray projected_array(array, mapping);
+        ASSERT_EQ(projected_array.GetBoolean(1), true);
+        ASSERT_EQ(projected_array.GetBoolean(0), false);
+        ASSERT_TRUE(projected_array.IsNullAt(2));
+    }
+    {
+        std::vector<int8_t> arr = {1, 2, 3, 4, 5};
+        auto array = std::make_shared<BinaryArray>();
+        BinaryArrayWriter writer =
+            BinaryArrayWriter(array.get(), arr.size(), sizeof(int8_t), 
pool.get());
+        for (size_t i = 0; i < arr.size(); i++) {
+            writer.WriteByte(i, arr[i]);
+        }
+        writer.Complete();
+
+        std::vector<int32_t> mapping = {4, 3, 2, 1, 0, -1};
+        ProjectedArray projected_array(array, mapping);
+        ASSERT_EQ(projected_array.GetByte(0), 5);
+        ASSERT_EQ(projected_array.GetByte(1), 4);
+        ASSERT_EQ(projected_array.GetByte(4), 1);
+        ASSERT_TRUE(projected_array.IsNullAt(5));
+    }
+    {
+        std::vector<int16_t> arr = {1, 2, 3, 4, 5};
+        auto array = std::make_shared<BinaryArray>();
+        BinaryArrayWriter writer =
+            BinaryArrayWriter(array.get(), arr.size(), sizeof(int16_t), 
pool.get());
+        for (size_t i = 0; i < arr.size(); i++) {
+            writer.WriteShort(i, arr[i]);
+        }
+        writer.Complete();
+
+        std::vector<int32_t> mapping = {4, 3, 2, 1, 0, -1};
+        ProjectedArray projected_array(array, mapping);
+        ASSERT_EQ(projected_array.GetShort(0), 5);
+        ASSERT_EQ(projected_array.GetShort(1), 4);
+        ASSERT_EQ(projected_array.GetShort(4), 1);
+        ASSERT_TRUE(projected_array.IsNullAt(5));
+    }
+    {
+        std::vector<int32_t> arr = {1, 2, 3, 4, 5};
+        auto array = std::make_shared<BinaryArray>();
+        BinaryArrayWriter writer =
+            BinaryArrayWriter(array.get(), arr.size(), sizeof(int32_t), 
pool.get());
+        for (size_t i = 0; i < arr.size(); i++) {
+            writer.WriteInt(i, arr[i]);
+        }
+        writer.Complete();
+
+        std::vector<int32_t> mapping = {4, 3, 2, 1, 0, -1};
+        ProjectedArray projected_array(array, mapping);
+        ASSERT_EQ(projected_array.GetInt(0), 5);
+        ASSERT_EQ(projected_array.GetInt(1), 4);
+        ASSERT_EQ(projected_array.GetInt(4), 1);
+        ASSERT_TRUE(projected_array.IsNullAt(5));
+    }
+    {
+        // test date
+        std::vector<int32_t> arr = {10000, 20006};
+        auto array = std::make_shared<BinaryArray>();
+        BinaryArrayWriter writer =
+            BinaryArrayWriter(array.get(), arr.size(), sizeof(int32_t), 
pool.get());
+        for (size_t i = 0; i < arr.size(); i++) {
+            writer.WriteInt(i, arr[i]);
+        }
+        writer.Complete();
+
+        std::vector<int32_t> mapping = {1, 0, -1};
+        ProjectedArray projected_array(array, mapping);
+        ASSERT_EQ(projected_array.GetDate(0), 20006);
+        ASSERT_EQ(projected_array.GetDate(1), 10000);
+        ASSERT_TRUE(projected_array.IsNullAt(2));
+    }
+    {
+        std::vector<float> arr = {1.0, 2.0};
+        auto array = std::make_shared<BinaryArray>();
+        BinaryArrayWriter writer =
+            BinaryArrayWriter(array.get(), arr.size(), sizeof(float), 
pool.get());
+        for (size_t i = 0; i < arr.size(); i++) {
+            writer.WriteFloat(i, arr[i]);
+        }
+        writer.Complete();
+
+        std::vector<int32_t> mapping = {1, 0, -1};
+        ProjectedArray projected_array(array, mapping);
+        ASSERT_EQ(projected_array.GetFloat(0), 2.0);
+        ASSERT_EQ(projected_array.GetFloat(1), 1.0);
+        ASSERT_TRUE(projected_array.IsNullAt(2));
+    }
+    {
+        std::vector<double> arr = {1.0, 2.0};
+        auto array = std::make_shared<BinaryArray>();
+        BinaryArrayWriter writer =
+            BinaryArrayWriter(array.get(), arr.size(), sizeof(double), 
pool.get());
+        for (size_t i = 0; i < arr.size(); i++) {
+            writer.WriteDouble(i, arr[i]);
+        }
+        writer.Complete();
+
+        std::vector<int32_t> mapping = {1, 0, -1};
+        ProjectedArray projected_array(array, mapping);
+        ASSERT_EQ(projected_array.GetDouble(0), 2.0);
+        ASSERT_EQ(projected_array.GetDouble(1), 1.0);
+        ASSERT_TRUE(projected_array.IsNullAt(2));
+    }
+    // decimal
+    {
+        // not compact (precision <= 18)
+        std::vector<Decimal> arr = {Decimal(6, 2, 123456), Decimal(6, 3, 
123456)};
+        auto array = std::make_shared<BinaryArray>();
+        BinaryArrayWriter writer =
+            BinaryArrayWriter(array.get(), arr.size(), /*element_size=*/8, 
pool.get());
+        for (size_t i = 0; i < arr.size(); i++) {
+            writer.WriteDecimal(i, arr[i], 6);
+        }
+        writer.Complete();
+
+        std::vector<int32_t> mapping = {1, 0, -1};
+        ProjectedArray projected_array(array, mapping);
+        ASSERT_EQ(projected_array.GetDecimal(0, 6, 3), Decimal(6, 3, 123456));
+        ASSERT_EQ(projected_array.GetDecimal(1, 6, 2), Decimal(6, 2, 123456));
+        ASSERT_TRUE(projected_array.IsNullAt(2));
+    }
+    {
+        // compact (precision > 18)
+        std::vector<Decimal> arr = {Decimal(/*precision=*/20, /*scale=*/3, 
123456),
+                                    Decimal(/*precision=*/20, /*scale=*/3, 
123456789)};
+        auto array = std::make_shared<BinaryArray>();
+        BinaryArrayWriter writer =
+            BinaryArrayWriter(array.get(), arr.size(), /*element_size=*/8, 
pool.get());
+        for (size_t i = 0; i < arr.size(); i++) {
+            writer.WriteDecimal(i, arr[i], /*precision=*/20);
+        }
+        writer.Complete();
+
+        std::vector<int32_t> mapping = {1, 0, -1};
+        ProjectedArray projected_array(array, mapping);
+        ASSERT_EQ(projected_array.GetDecimal(0, 20, 3), Decimal(20, 3, 
123456789));
+        ASSERT_EQ(projected_array.GetDecimal(1, 20, 3), Decimal(20, 3, 
123456));
+        ASSERT_TRUE(projected_array.IsNullAt(2));
+    }
+    // timestamp
+    {
+        std::vector<Timestamp> arr = {Timestamp(0, 0), Timestamp(12345, 1)};
+        auto array = std::make_shared<BinaryArray>();
+        BinaryArrayWriter writer =
+            BinaryArrayWriter(array.get(), arr.size(), /*element_size=*/8, 
pool.get());
+        for (size_t i = 0; i < arr.size(); i++) {
+            writer.WriteTimestamp(i, arr[i], 9);
+        }
+        writer.Complete();
+
+        std::vector<int32_t> mapping = {1, 0, -1};
+        ProjectedArray projected_array(array, mapping);
+        ASSERT_EQ(projected_array.GetTimestamp(0, 9), Timestamp(12345, 1));
+        ASSERT_EQ(projected_array.GetTimestamp(1, 9), Timestamp(0, 0));
+        ASSERT_TRUE(projected_array.IsNullAt(2));
+    }
+    // binary
+    {
+        std::vector<Bytes> arr;
+        arr.emplace_back("hello", pool.get());
+        arr.emplace_back("world", pool.get());
+        auto array = std::make_shared<BinaryArray>();
+        BinaryArrayWriter writer =
+            BinaryArrayWriter(array.get(), arr.size(), /*element_size=*/8, 
pool.get());
+        for (size_t i = 0; i < arr.size(); i++) {
+            writer.WriteBinary(i, arr[i]);
+        }
+        writer.Complete();
+
+        std::vector<int32_t> mapping = {1, 0, -1};
+        ProjectedArray projected_array(array, mapping);
+        ASSERT_EQ(*projected_array.GetBinary(0), arr[1]);
+        ASSERT_EQ(*projected_array.GetBinary(1), arr[0]);
+        ASSERT_TRUE(projected_array.IsNullAt(2));
+    }
+    // string
+    {
+        std::vector<BinaryString> arr;
+        arr.push_back(BinaryString::FromString("hello", pool.get()));
+        arr.push_back(BinaryString::FromString("world", pool.get()));
+        auto array = std::make_shared<BinaryArray>();
+        BinaryArrayWriter writer =
+            BinaryArrayWriter(array.get(), arr.size(), /*element_size=*/8, 
pool.get());
+        for (size_t i = 0; i < arr.size(); i++) {
+            writer.WriteString(i, arr[i]);
+        }
+        writer.Complete();
+
+        std::vector<int32_t> mapping = {1, 0, -1};
+        ProjectedArray projected_array(array, mapping);
+        ASSERT_EQ(projected_array.GetString(0), arr[1]);
+        ASSERT_EQ(projected_array.GetString(1), arr[0]);
+        ASSERT_TRUE(projected_array.IsNullAt(2));
+    }
+    // array
+    {
+        // element1
+        std::vector<int32_t> arr1 = {1, 2};
+        auto array1 = std::make_shared<BinaryArray>();
+        BinaryArrayWriter writer1 =
+            BinaryArrayWriter(array1.get(), arr1.size(), sizeof(int32_t), 
pool.get());
+        for (size_t i = 0; i < arr1.size(); i++) {
+            writer1.WriteInt(i, arr1[i]);
+        }
+        writer1.Complete();
+        // element2
+        std::vector<int32_t> arr2 = {100, 200};
+        auto array2 = std::make_shared<BinaryArray>();
+        BinaryArrayWriter writer2 =
+            BinaryArrayWriter(array2.get(), arr2.size(), sizeof(int32_t), 
pool.get());
+        for (size_t i = 0; i < arr2.size(); i++) {
+            writer2.WriteInt(i, arr2[i]);
+        }
+        writer2.Complete();
+        // array
+        std::vector<std::shared_ptr<BinaryArray>> arr;
+        arr.push_back(array1);
+        arr.push_back(array2);
+        auto array = std::make_shared<BinaryArray>();
+        BinaryArrayWriter writer = BinaryArrayWriter(array.get(), arr.size(), 
8, pool.get());
+        for (size_t i = 0; i < arr.size(); i++) {
+            writer.WriteArray(i, *arr[i]);
+        }
+        writer.Complete();
+
+        std::vector<int32_t> mapping = {1, 0, -1};
+        ProjectedArray projected_array(array, mapping);
+
+        auto ret0 = 
std::dynamic_pointer_cast<BinaryArray>(projected_array.GetArray(0));
+        auto ret1 = 
std::dynamic_pointer_cast<BinaryArray>(projected_array.GetArray(1));
+        ASSERT_TRUE(ret0);
+        ASSERT_TRUE(ret1);
+        ASSERT_EQ(*ret0, *arr[1]);
+        ASSERT_EQ(*ret1, *arr[0]);
+        ASSERT_TRUE(projected_array.IsNullAt(2));
+    }
+}
+
+TEST(ProjectedArrayTest, TestGetStringView) {
+    auto pool = GetDefaultPool();
+    std::vector<BinaryString> arr;
+    arr.push_back(BinaryString::FromString("hello", pool.get()));
+    arr.push_back(BinaryString::FromString("world", pool.get()));
+    auto array = std::make_shared<BinaryArray>();
+    BinaryArrayWriter writer =
+        BinaryArrayWriter(array.get(), arr.size(), /*element_size=*/8, 
pool.get());
+    for (size_t i = 0; i < arr.size(); i++) {
+        writer.WriteString(i, arr[i]);
+    }
+    writer.Complete();
+
+    std::vector<int32_t> mapping = {1, 0, -1};
+    ProjectedArray projected_array(array, mapping);
+    ASSERT_EQ(projected_array.GetStringView(0), "world");
+    ASSERT_EQ(projected_array.GetStringView(1), "hello");
+    ASSERT_TRUE(projected_array.IsNullAt(2));
+}
+
+TEST(ProjectedArrayTest, TestGetMap) {
+    auto pool = GetDefaultPool();
+    auto key = BinaryArray::FromIntArray({1, 2, 3}, pool.get());
+    auto value = BinaryArray::FromLongArray({100ll, 200ll, 300ll}, pool.get());
+    auto map1 = BinaryMap::ValueOf(key, value, pool.get());
+
+    auto key2 = BinaryArray::FromIntArray({10, 20}, pool.get());
+    auto value2 = BinaryArray::FromLongArray({1000ll, 2000ll}, pool.get());
+    auto map2 = BinaryMap::ValueOf(key2, value2, pool.get());
+
+    auto array = std::make_shared<BinaryArray>();
+    BinaryArrayWriter writer =
+        BinaryArrayWriter(array.get(), /*num_elements=*/2, /*element_size=*/8, 
pool.get());
+    writer.WriteMap(0, *map1);
+    writer.WriteMap(1, *map2);
+    writer.Complete();
+
+    std::vector<int32_t> mapping = {1, 0, -1};
+    ProjectedArray projected_array(array, mapping);
+
+    // mapping[0] -> original index 1 -> map2
+    auto result0 = projected_array.GetMap(0);
+    ASSERT_EQ(result0->Size(), 2);
+    ASSERT_EQ(result0->KeyArray()->ToIntArray().value(), 
(std::vector<int32_t>{10, 20}));
+
+    // mapping[1] -> original index 0 -> map1
+    auto result1 = projected_array.GetMap(1);
+    ASSERT_EQ(result1->Size(), 3);
+    ASSERT_EQ(result1->KeyArray()->ToIntArray().value(), 
(std::vector<int32_t>{1, 2, 3}));
+
+    ASSERT_TRUE(projected_array.IsNullAt(2));
+}
+
+TEST(ProjectedArrayTest, TestGetRow) {
+    auto pool = GetDefaultPool();
+    BinaryRow row1 = BinaryRowGenerator::GenerateRow({std::string("Alice"), 
30}, pool.get());
+    BinaryRow row2 = BinaryRowGenerator::GenerateRow({std::string("Bob"), 40}, 
pool.get());
+
+    auto array = std::make_shared<BinaryArray>();
+    BinaryArrayWriter writer =
+        BinaryArrayWriter(array.get(), /*num_elements=*/2, /*element_size=*/8, 
pool.get());
+    writer.WriteRow(0, row1);
+    writer.WriteRow(1, row2);
+    writer.Complete();
+
+    std::vector<int32_t> mapping = {1, 0, -1};
+    ProjectedArray projected_array(array, mapping);
+
+    // mapping[0] -> original index 1 -> row2
+    auto result0 = 
std::dynamic_pointer_cast<BinaryRow>(projected_array.GetRow(0, 2));
+    ASSERT_TRUE(result0);
+    ASSERT_EQ(*result0, row2);
+
+    // mapping[1] -> original index 0 -> row1
+    auto result1 = 
std::dynamic_pointer_cast<BinaryRow>(projected_array.GetRow(1, 2));
+    ASSERT_TRUE(result1);
+    ASSERT_EQ(*result1, row1);
+
+    ASSERT_TRUE(projected_array.IsNullAt(2));
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/utils/projected_row.h 
b/src/paimon/common/utils/projected_row.h
new file mode 100644
index 0000000..cf6e146
--- /dev/null
+++ b/src/paimon/common/utils/projected_row.h
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <string_view>
+#include <vector>
+
+#include "fmt/format.h"
+#include "fmt/ranges.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/common/data/data_define.h"
+#include "paimon/common/data/internal_row.h"
+#include "paimon/common/types/row_kind.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/result.h"
+
+namespace paimon {
+class Bytes;
+class InternalArray;
+class InternalMap;
+
+/// An implementation of `InternalRow` which provides a projected view of the 
underlying
+/// `InternalRow`.
+///
+/// Projection includes both reducing the accessible fields and reordering 
them.
+///
+/// @note This class supports only top-level projections, not nested 
projections.
+class ProjectedRow : public InternalRow {
+ public:
+    // e.g., mapping = [2, 1, -1],
+    // GetInt(pos = 0) return inner row->GetInt(pos = 2)
+    // -1 in mapping indicates null, IsNullAt(pos = 2) return true
+    ProjectedRow(const std::shared_ptr<InternalRow>& row, const 
std::vector<int32_t>& mapping)
+        : row_(row), mapping_(mapping) {
+        assert(row_);
+    }
+
+    int32_t GetFieldCount() const override {
+        return mapping_.size();
+    }
+
+    Result<const RowKind*> GetRowKind() const override {
+        return row_->GetRowKind();
+    }
+
+    void SetRowKind(const RowKind* kind) override {
+        row_->SetRowKind(kind);
+    }
+
+    bool IsNullAt(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        if (mapping_[pos] < 0) {
+            return true;
+        }
+        return row_->IsNullAt(mapping_[pos]);
+    }
+
+    bool GetBoolean(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return row_->GetBoolean(mapping_[pos]);
+    }
+
+    char GetByte(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return row_->GetByte(mapping_[pos]);
+    }
+
+    int16_t GetShort(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return row_->GetShort(mapping_[pos]);
+    }
+
+    int32_t GetInt(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return row_->GetInt(mapping_[pos]);
+    }
+
+    int32_t GetDate(int32_t pos) const override {
+        return GetInt(pos);
+    }
+
+    int64_t GetLong(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return row_->GetLong(mapping_[pos]);
+    }
+
+    float GetFloat(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return row_->GetFloat(mapping_[pos]);
+    }
+
+    double GetDouble(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return row_->GetDouble(mapping_[pos]);
+    }
+
+    BinaryString GetString(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return row_->GetString(mapping_[pos]);
+    }
+
+    std::string_view GetStringView(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return row_->GetStringView(mapping_[pos]);
+    }
+
+    Decimal GetDecimal(int32_t pos, int32_t precision, int32_t scale) const 
override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return row_->GetDecimal(mapping_[pos], precision, scale);
+    }
+
+    Timestamp GetTimestamp(int32_t pos, int32_t precision) const override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return row_->GetTimestamp(mapping_[pos], precision);
+    }
+
+    std::shared_ptr<Bytes> GetBinary(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return row_->GetBinary(mapping_[pos]);
+    }
+
+    std::shared_ptr<InternalArray> GetArray(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return row_->GetArray(mapping_[pos]);
+    }
+
+    std::shared_ptr<InternalMap> GetMap(int32_t pos) const override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return row_->GetMap(mapping_[pos]);
+    }
+
+    std::shared_ptr<InternalRow> GetRow(int32_t pos, int32_t num_fields) const 
override {
+        assert(static_cast<size_t>(pos) < mapping_.size());
+        return row_->GetRow(mapping_[pos], num_fields);
+    }
+
+    std::string ToString() const override {
+        auto row_kind = row_->GetRowKind();
+        std::string row_kind_str =
+            row_kind.ok() ? row_kind.value()->ShortString() : "unknown row 
kind";
+        return fmt::format("{} {{ indexMapping={}, mutableRow={} }}", 
row_kind_str,
+                           fmt::join(mapping_, ", "), row_->ToString());
+    }
+
+ private:
+    std::shared_ptr<InternalRow> row_;
+    std::vector<int32_t> mapping_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/utils/projected_row_test.cpp 
b/src/paimon/common/utils/projected_row_test.cpp
new file mode 100644
index 0000000..ade7782
--- /dev/null
+++ b/src/paimon/common/utils/projected_row_test.cpp
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/utils/projected_row.h"
+
+#include <utility>
+#include <variant>
+
+#include "arrow/api.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/binary_array.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/data/columnar/columnar_map.h"
+#include "paimon/common/data/generic_row.h"
+#include "paimon/common/data/internal_array.h"
+#include "paimon/common/data/internal_map.h"
+#include "paimon/common/utils/decimal_utils.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/binary_row_generator.h"
+
+namespace paimon::test {
+TEST(ProjectedRowTest, TestSimple) {
+    auto pool = GetDefaultPool();
+    auto row = std::make_shared<GenericRow>(16);
+    row->SetField(0, true);
+    row->SetField(1, static_cast<char>(1));
+    row->SetField(2, static_cast<int16_t>(2));
+    row->SetField(3, static_cast<int32_t>(3));
+    row->SetField(4, static_cast<int64_t>(4));
+    row->SetField(5, static_cast<float>(5.1));
+    row->SetField(6, 6.12);
+    auto str = BinaryString::FromString("abcd", pool.get());
+    row->SetField(7, str);
+    std::shared_ptr<Bytes> bytes = Bytes::AllocateBytes("efgh", pool.get());
+    row->SetField(8, bytes);
+    std::string str9 = "apple";
+    row->SetField(9, std::string_view(str9.data(), str9.size()));
+
+    Timestamp ts(100, 20);
+    row->SetField(10, ts);
+    Decimal decimal(/*precision=*/30, /*scale=*/20,
+                    
DecimalUtils::StrToInt128("12345678998765432145678").value());
+    row->SetField(11, decimal);
+
+    auto array = std::make_shared<BinaryArray>(BinaryArray::FromLongArray(
+        {static_cast<int64_t>(10), static_cast<int64_t>(20)}, pool.get()));
+    row->SetField(12, array);
+
+    std::shared_ptr<InternalRow> binary_row =
+        BinaryRowGenerator::GenerateRowPtr({100, 200}, pool.get());
+    row->SetField(13, binary_row);
+
+    auto key = arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), "[1, 
2, 3]").ValueOrDie();
+    auto value =
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::int64(), "[2, 4, 
6]").ValueOrDie();
+    auto map = std::make_shared<ColumnarMap>(key, value, pool, /*offset=*/0, 
/*length=*/3);
+    row->SetField(14, map);
+    // do not set value at pos 15, therefore, pos 15 is null
+
+    std::vector<int32_t> mapping = {15, 14, 13, 12, 11, 10, 9,  8,  7,  6,
+                                    5,  4,  3,  2,  1,  0,  -1, -1, -1, -1};
+
+    ProjectedRow projected_row(row, mapping);
+
+    // test row kind
+    ASSERT_EQ(projected_row.GetRowKind().value(), RowKind::Insert());
+    projected_row.SetRowKind(RowKind::Delete());
+    ASSERT_EQ(projected_row.GetRowKind().value(), RowKind::Delete());
+    ASSERT_EQ(row->GetRowKind().value(), RowKind::Delete());
+
+    ASSERT_EQ(projected_row.GetFieldCount(), 20);
+
+    ASSERT_TRUE(projected_row.IsNullAt(19));
+    ASSERT_TRUE(projected_row.IsNullAt(18));
+    ASSERT_TRUE(projected_row.IsNullAt(17));
+    ASSERT_TRUE(projected_row.IsNullAt(16));
+
+    ASSERT_EQ(projected_row.GetBoolean(15), true);
+    ASSERT_EQ(projected_row.GetByte(14), static_cast<char>(1));
+    ASSERT_EQ(projected_row.GetShort(13), static_cast<int16_t>(2));
+    ASSERT_EQ(projected_row.GetInt(12), static_cast<int32_t>(3));
+    ASSERT_EQ(projected_row.GetDate(12), static_cast<int32_t>(3));
+    ASSERT_EQ(projected_row.GetLong(11), static_cast<int64_t>(4));
+    ASSERT_EQ(projected_row.GetFloat(10), static_cast<float>(5.1));
+    ASSERT_EQ(projected_row.GetDouble(9), static_cast<double>(6.12));
+    ASSERT_EQ(projected_row.GetString(8), str);
+    ASSERT_EQ(*projected_row.GetBinary(7), *bytes);
+    ASSERT_EQ(std::string(projected_row.GetStringView(6)), str9);
+    ASSERT_EQ(projected_row.GetTimestamp(5, /*precision=*/9), ts);
+    ASSERT_EQ(projected_row.GetDecimal(4, /*precision=*/30, /*scale=*/20), 
decimal);
+    ASSERT_EQ(projected_row.GetArray(3)->ToLongArray().value(), 
array->ToLongArray().value());
+
+    auto binary_row_result = 
std::dynamic_pointer_cast<BinaryRow>(projected_row.GetRow(2, 2));
+    auto binary_row_expected = 
std::dynamic_pointer_cast<BinaryRow>(binary_row);
+    ASSERT_EQ(*binary_row_result, *binary_row_expected);
+
+    ASSERT_EQ(projected_row.GetMap(1)->KeyArray()->ToIntArray().value(),
+              map->KeyArray()->ToIntArray().value());
+    ASSERT_EQ(projected_row.GetMap(1)->ValueArray()->ToLongArray().value(),
+              map->ValueArray()->ToLongArray().value());
+
+    ASSERT_TRUE(projected_row.IsNullAt(0));
+
+    ASSERT_EQ(
+        "-D { indexMapping=15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 
0, -1, -1, -1, -1, "
+        "mutableRow=(true,1,2,3,4,5.100000,6.120000,abcd,efgh,apple,1970-01-01 
"
+        "00:00:00.100000020,123.45678998765432145678,array,row,map,null) }",
+        projected_row.ToString());
+}
+}  // namespace paimon::test
diff --git a/src/paimon/common/utils/rapidjson_util.h 
b/src/paimon/common/utils/rapidjson_util.h
index d413f25..7a6c440 100644
--- a/src/paimon/common/utils/rapidjson_util.h
+++ b/src/paimon/common/utils/rapidjson_util.h
@@ -1,20 +1,17 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Copyright 2024-present Alibaba Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
  *
  *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 #pragma once
diff --git a/src/paimon/common/utils/rapidjson_util_test.cpp 
b/src/paimon/common/utils/rapidjson_util_test.cpp
index c5011d9..b143653 100644
--- a/src/paimon/common/utils/rapidjson_util_test.cpp
+++ b/src/paimon/common/utils/rapidjson_util_test.cpp
@@ -1,20 +1,17 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Copyright 2024-present Alibaba Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
  *
  *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 #include "paimon/common/utils/rapidjson_util.h"

Reply via email to