This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 481bb3b feat: add projected array, projected row, field comparator
and partition computer (#32)
481bb3b is described below
commit 481bb3bbb7fe9d5c4c2c8c408568f67ad5c7d853
Author: lszskye <[email protected]>
AuthorDate: Mon Jun 1 14:44:11 2026 +0800
feat: add projected array, projected row, field comparator and partition
computer (#32)
---
.../common/utils/binary_row_partition_computer.cpp | 164 +++++++++
.../common/utils/binary_row_partition_computer.h | 85 +++++
.../utils/binary_row_partition_computer_test.cpp | 329 ++++++++++++++++++
src/paimon/common/utils/fields_comparator.cpp | 189 +++++++++++
src/paimon/common/utils/fields_comparator.h | 104 ++++++
src/paimon/common/utils/fields_comparator_test.cpp | 331 ++++++++++++++++++
src/paimon/common/utils/internal_row_utils.h | 108 ++++++
.../common/utils/internal_row_utils_test.cpp | 41 +++
src/paimon/common/utils/projected_array.h | 170 ++++++++++
src/paimon/common/utils/projected_array_test.cpp | 370 +++++++++++++++++++++
src/paimon/common/utils/projected_row.h | 173 ++++++++++
src/paimon/common/utils/projected_row_test.cpp | 128 +++++++
src/paimon/common/utils/rapidjson_util.h | 23 +-
src/paimon/common/utils/rapidjson_util_test.cpp | 23 +-
14 files changed, 2212 insertions(+), 26 deletions(-)
diff --git a/src/paimon/common/utils/binary_row_partition_computer.cpp
b/src/paimon/common/utils/binary_row_partition_computer.cpp
new file mode 100644
index 0000000..b9b0685
--- /dev/null
+++ b/src/paimon/common/utils/binary_row_partition_computer.cpp
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/utils/binary_row_partition_computer.h"
+
+#include <algorithm>
+#include <cstddef>
+
+#include "arrow/type.h"
+#include "fmt/format.h"
+#include "fmt/ranges.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/data/binary_row_writer.h"
+#include "paimon/common/utils/string_utils.h"
+#include "paimon/macros.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+class MemoryPool;
+
+BinaryRowPartitionComputer::BinaryRowPartitionComputer(
+ const std::vector<std::string>& partition_keys, const
std::shared_ptr<arrow::Schema>& schema,
+ const std::string& default_part_value,
+ const std::vector<PartitionConverter>& partition_converters,
+ const std::shared_ptr<MemoryPool>& memory_pool)
+ : memory_pool_(memory_pool),
+ partition_keys_(partition_keys),
+ schema_(schema),
+ default_part_value_(default_part_value),
+ partition_converters_(partition_converters) {}
+
+Result<std::unique_ptr<BinaryRowPartitionComputer>>
BinaryRowPartitionComputer::Create(
+ const std::vector<std::string>& partition_keys, const
std::shared_ptr<arrow::Schema>& schema,
+ const std::string& default_part_value, bool legacy_partition_name_enabled,
+ const std::shared_ptr<MemoryPool>& memory_pool) {
+ if (PAIMON_UNLIKELY(schema == nullptr)) {
+ return Status::Invalid(
+ "create binary row partition computer failed, schema is null
pointer.");
+ }
+ if (PAIMON_UNLIKELY(memory_pool == nullptr)) {
+ return Status::Invalid(
+ "create binary row partition computer failed, memory pool is null
pointer.");
+ }
+ std::vector<PartitionConverter> partition_converters;
+ for (const auto& partition_key : partition_keys) {
+ PAIMON_ASSIGN_OR_RAISE(arrow::Type::type type_id,
+ GetTypeFromArrowSchema(schema, partition_key));
+ PAIMON_ASSIGN_OR_RAISE(
+ DataConverterUtils::StrToBinaryRowConverter converter,
+ DataConverterUtils::CreateDataToBinaryRowConverter(type_id,
memory_pool.get()));
+
PAIMON_ASSIGN_OR_RAISE(DataConverterUtils::BinaryRowFieldToStrConverter
reconverter,
+
DataConverterUtils::CreateBinaryRowFieldToStringConverter(
+ type_id, legacy_partition_name_enabled));
+ partition_converters.emplace_back(partition_key, std::move(converter),
+ std::move(reconverter));
+ }
+ return std::unique_ptr<BinaryRowPartitionComputer>(new
BinaryRowPartitionComputer(
+ partition_keys, schema, default_part_value, partition_converters,
memory_pool));
+}
+
+Result<BinaryRow> BinaryRowPartitionComputer::ToBinaryRow(
+ const std::map<std::string, std::string>& partition) const {
+ BinaryRow binary_row(partition_converters_.size());
+ BinaryRowWriter writer(&binary_row, /*initial_size=*/0,
memory_pool_.get());
+ for (size_t field_idx = 0; field_idx < partition_converters_.size();
field_idx++) {
+ const auto& partition_extractor = partition_converters_[field_idx];
+ const auto& partition_key = partition_extractor.partition_key;
+ const auto& to_binary_row = partition_extractor.converter;
+ auto input_iter = partition.find(partition_key);
+ if (input_iter == partition.end()) {
+ return Status::Invalid(
+ fmt::format("can not find partition key '{}' in input
partition '{}'",
+ partition_key, partition));
+ }
+ const auto& value_str = input_iter->second;
+ if (value_str == default_part_value_) {
+ // TODO(yonghao.fyh): when support decimal/ timestamp in
partition, use
+ // WriteTimestamp(null) for non compact precision
+ writer.SetNullAt(field_idx);
+ } else {
+ PAIMON_RETURN_NOT_OK(to_binary_row(value_str, field_idx, &writer));
+ }
+ }
+ writer.Complete();
+ return binary_row;
+}
+
+Result<std::vector<std::pair<std::string, std::string>>>
+BinaryRowPartitionComputer::GeneratePartitionVector(const BinaryRow&
partition) const {
+ if (static_cast<size_t>(partition.GetFieldCount()) !=
partition_converters_.size()) {
+ return Status::Invalid(fmt::format(
+ "partition binary row field count {} not match with partition
converter size {}",
+ partition.GetFieldCount(), partition_converters_.size()));
+ }
+ std::vector<std::pair<std::string, std::string>> result;
+ for (size_t field_idx = 0; field_idx < partition_converters_.size();
field_idx++) {
+ const auto& partition_extractor = partition_converters_[field_idx];
+ const auto& partition_key = partition_extractor.partition_key;
+ const auto& to_str = partition_extractor.reconverter;
+ if (partition.IsNullAt(field_idx)) {
+ result.emplace_back(partition_key, default_part_value_);
+ } else {
+ PAIMON_ASSIGN_OR_RAISE(std::string partition_field_str,
to_str(partition, field_idx));
+ if (StringUtils::IsNullOrWhitespaceOnly(partition_field_str)) {
+ partition_field_str = default_part_value_;
+ }
+ result.emplace_back(partition_key, partition_field_str);
+ }
+ }
+ return result;
+}
+
+Result<arrow::Type::type> BinaryRowPartitionComputer::GetTypeFromArrowSchema(
+ const std::shared_ptr<arrow::Schema>& schema, const std::string&
field_name) {
+ auto field = schema->GetFieldByName(field_name);
+ if (field == nullptr) {
+ return Status::Invalid(
+ fmt::format("field {} not in schema {}", field_name,
schema->ToString()));
+ }
+ return field->type()->id();
+}
+
+Result<std::string> BinaryRowPartitionComputer::PartToSimpleString(
+ const std::shared_ptr<arrow::Schema>& partition_type, const BinaryRow&
partition,
+ const std::string& delimiter, int32_t max_length) {
+ std::vector<DataConverterUtils::BinaryRowFieldToStrConverter>
partition_converters;
+ partition_converters.reserve(partition_type->num_fields());
+ for (const auto& field : partition_type->fields()) {
+
PAIMON_ASSIGN_OR_RAISE(DataConverterUtils::BinaryRowFieldToStrConverter
converter,
+
DataConverterUtils::CreateBinaryRowFieldToStringConverter(
+ field->type()->id(),
/*legacy_partition_name_enabled=*/true));
+ partition_converters.emplace_back(converter);
+ }
+ std::vector<std::string> partition_vec;
+ partition_vec.reserve(partition_converters.size());
+ for (size_t field_idx = 0; field_idx < partition_converters.size();
field_idx++) {
+ const auto& to_str = partition_converters[field_idx];
+ if (partition.IsNullAt(field_idx)) {
+ partition_vec.push_back("null");
+ } else {
+ PAIMON_ASSIGN_OR_RAISE(std::string partition_field_str,
to_str(partition, field_idx));
+ partition_vec.push_back(partition_field_str);
+ }
+ }
+ return fmt::format("{}", fmt::join(partition_vec, delimiter)).substr(0,
max_length);
+}
+} // namespace paimon
diff --git a/src/paimon/common/utils/binary_row_partition_computer.h
b/src/paimon/common/utils/binary_row_partition_computer.h
new file mode 100644
index 0000000..ae371bf
--- /dev/null
+++ b/src/paimon/common/utils/binary_row_partition_computer.h
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/type.h"
+#include "paimon/common/utils/data_converter_utils.h"
+#include "paimon/result.h"
+#include "paimon/type_fwd.h"
+
+namespace paimon {
+
+class BinaryRow;
+class MemoryPool;
+
+// TODO(yonghao.fyh): rethink naming of converter/reconverter
+struct PartitionConverter {
+ PartitionConverter(const std::string& _partition_key,
+ typename DataConverterUtils::StrToBinaryRowConverter&&
_converter,
+ typename
DataConverterUtils::BinaryRowFieldToStrConverter&& _reconverter)
+ : partition_key(_partition_key), converter(_converter),
reconverter(_reconverter) {}
+ std::string partition_key;
+ typename DataConverterUtils::StrToBinaryRowConverter converter;
+ typename DataConverterUtils::BinaryRowFieldToStrConverter reconverter;
+};
+
+/// PartitionComputer for `BinaryRow`.
+class BinaryRowPartitionComputer {
+ public:
+ static Result<std::unique_ptr<BinaryRowPartitionComputer>> Create(
+ const std::vector<std::string>& partition_keys,
+ const std::shared_ptr<arrow::Schema>& schema, const std::string&
default_part_value,
+ bool legacy_partition_name_enabled, const std::shared_ptr<MemoryPool>&
memory_pool);
+
+ Result<BinaryRow> ToBinaryRow(const std::map<std::string, std::string>&
partition) const;
+ Result<std::vector<std::pair<std::string, std::string>>>
GeneratePartitionVector(
+ const BinaryRow& partition) const;
+ const std::vector<std::string>& GetPartitionKeys() const {
+ return partition_keys_;
+ }
+
+ static Result<std::string> PartToSimpleString(
+ const std::shared_ptr<arrow::Schema>& partition_type, const BinaryRow&
partition,
+ const std::string& delimiter, int32_t max_length);
+
+ private:
+ BinaryRowPartitionComputer(const std::vector<std::string>& partition_keys,
+ const std::shared_ptr<arrow::Schema>& schema,
+ const std::string& default_part_value,
+ const std::vector<PartitionConverter>&
partition_converters,
+ const std::shared_ptr<MemoryPool>& memory_pool);
+
+ static Result<arrow::Type::type> GetTypeFromArrowSchema(
+ const std::shared_ptr<arrow::Schema>& schema, const std::string&
field_name);
+
+ std::shared_ptr<MemoryPool> memory_pool_;
+ std::vector<std::string> partition_keys_;
+ std::shared_ptr<arrow::Schema> schema_;
+ std::string default_part_value_;
+ std::vector<PartitionConverter> partition_converters_;
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/utils/binary_row_partition_computer_test.cpp
b/src/paimon/common/utils/binary_row_partition_computer_test.cpp
new file mode 100644
index 0000000..119cb1b
--- /dev/null
+++ b/src/paimon/common/utils/binary_row_partition_computer_test.cpp
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/utils/binary_row_partition_computer.h"
+
+#include <cstdint>
+#include <limits>
+#include <string>
+#include <variant>
+
+#include "arrow/type.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/binary_row_generator.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(BinaryRowPartitionComputerTest, TestToAndFromBinaryRow) {
+ auto pool = GetDefaultPool();
+ arrow::FieldVector fields = {arrow::field("f0", arrow::boolean()),
+ arrow::field("f1", arrow::int8()),
+ arrow::field("f2", arrow::int8()),
+ arrow::field("f3", arrow::int16()),
+ arrow::field("f4", arrow::int16()),
+ arrow::field("f5", arrow::int32()),
+ arrow::field("f6", arrow::int32()),
+ arrow::field("f7", arrow::int64()),
+ arrow::field("f8", arrow::int64()),
+ arrow::field("f9", arrow::float32()),
+ arrow::field("f10", arrow::float64()),
+ arrow::field("f11", arrow::utf8()),
+ arrow::field("f12", arrow::utf8()),
+ arrow::field("f13", arrow::date32()),
+ arrow::field("non-partition-field",
arrow::int32())};
+
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> partition_keys = {"f0", "f2", "f1", "f3", "f4",
"f5", "f6",
+ "f7", "f8", "f9", "f10", "f11",
"f12", "f13"};
+ {
+ // simple case with legacy_partition_name_enabled = true
+ ASSERT_OK_AND_ASSIGN(
+ std::unique_ptr<BinaryRowPartitionComputer> computer,
+ BinaryRowPartitionComputer::Create(partition_keys, schema,
"__DEFAULT_PARTITION__",
+
/*legacy_partition_name_enabled=*/true, pool));
+ std::map<std::string, std::string> partition_map = {
+ {"f0", "true"},
+ {"f1", "10"},
+ {"f2", "-20"},
+ {"f3", "1556"},
+ {"f4", "-2556"},
+ {"f5", "348489"},
+ {"f6", "-448489"},
+ {"f7", "-9223372036854775808"},
+ {"f8", "182737474"},
+ {"f9", "0.334"},
+ {"f10", "467.66472"},
+ {"f11", "abcde"},
+ {"f12", "这是一个很长很长的中文"},
+ {"f13", "5"},
+ };
+ ASSERT_OK_AND_ASSIGN(BinaryRow row,
computer->ToBinaryRow(partition_map));
+ ASSERT_EQ(14, row.GetFieldCount());
+ ASSERT_EQ(true, row.GetBoolean(0));
+ ASSERT_EQ(-20, row.GetByte(1));
+ ASSERT_EQ(10, row.GetByte(2));
+ ASSERT_EQ(1556, row.GetShort(3));
+ ASSERT_EQ(-2556, row.GetShort(4));
+ ASSERT_EQ(348489, row.GetInt(5));
+ ASSERT_EQ(-448489, row.GetInt(6));
+ ASSERT_EQ(std::numeric_limits<int64_t>::min(), row.GetLong(7));
+ ASSERT_EQ(182737474l, row.GetLong(8));
+ ASSERT_NEAR(0.334, row.GetFloat(9), 0.0000001);
+ ASSERT_NEAR(467.66472, row.GetDouble(10), 0.0000001);
+ ASSERT_EQ("abcde", row.GetString(11).ToString());
+ ASSERT_EQ("这是一个很长很长的中文", row.GetString(12).ToString());
+ ASSERT_EQ(5, row.GetDate(13));
+
+ std::vector<std::pair<std::string, std::string>> part_values;
+ ASSERT_OK_AND_ASSIGN(part_values,
computer->GeneratePartitionVector(row));
+ ASSERT_EQ(14, part_values.size());
+ std::map<std::string, std::string> actual_part_values_map;
+ for (const auto& [key, value] : part_values) {
+ actual_part_values_map[key] = value;
+ }
+ ASSERT_EQ(actual_part_values_map, partition_map);
+ }
+ {
+ // simple case with legacy_partition_name_enabled = false
+ ASSERT_OK_AND_ASSIGN(
+ std::unique_ptr<BinaryRowPartitionComputer> computer,
+ BinaryRowPartitionComputer::Create(partition_keys, schema,
"__DEFAULT_PARTITION__",
+
/*legacy_partition_name_enabled=*/false, pool));
+ std::map<std::string, std::string> partition_map = {
+ {"f0", "true"},
+ {"f1", "10"},
+ {"f2", "-20"},
+ {"f3", "1556"},
+ {"f4", "-2556"},
+ {"f5", "348489"},
+ {"f6", "-448489"},
+ {"f7", "-9223372036854775808"},
+ {"f8", "182737474"},
+ {"f9", "0.334"},
+ {"f10", "467.66472"},
+ {"f11", "abcde"},
+ {"f12", "这是一个很长很长的中文"},
+ {"f13", "1970-01-06"},
+ };
+ ASSERT_OK_AND_ASSIGN(BinaryRow row,
computer->ToBinaryRow(partition_map));
+ ASSERT_EQ(14, row.GetFieldCount());
+ ASSERT_EQ(true, row.GetBoolean(0));
+ ASSERT_EQ(-20, row.GetByte(1));
+ ASSERT_EQ(10, row.GetByte(2));
+ ASSERT_EQ(1556, row.GetShort(3));
+ ASSERT_EQ(-2556, row.GetShort(4));
+ ASSERT_EQ(348489, row.GetInt(5));
+ ASSERT_EQ(-448489, row.GetInt(6));
+ ASSERT_EQ(std::numeric_limits<int64_t>::min(), row.GetLong(7));
+ ASSERT_EQ(182737474l, row.GetLong(8));
+ ASSERT_NEAR(0.334, row.GetFloat(9), 0.0000001);
+ ASSERT_NEAR(467.66472, row.GetDouble(10), 0.0000001);
+ ASSERT_EQ("abcde", row.GetString(11).ToString());
+ ASSERT_EQ("这是一个很长很长的中文", row.GetString(12).ToString());
+ ASSERT_EQ(5, row.GetDate(13));
+
+ std::vector<std::pair<std::string, std::string>> part_values;
+ ASSERT_OK_AND_ASSIGN(part_values,
computer->GeneratePartitionVector(row));
+ ASSERT_EQ(14, part_values.size());
+ std::map<std::string, std::string> actual_part_values_map;
+ for (const auto& [key, value] : part_values) {
+ actual_part_values_map[key] = value;
+ }
+ ASSERT_EQ(actual_part_values_map, partition_map);
+ }
+ {
+ // simple case with default partition value
+ ASSERT_OK_AND_ASSIGN(
+ std::unique_ptr<BinaryRowPartitionComputer> computer,
+ BinaryRowPartitionComputer::Create(partition_keys, schema,
"__DEFAULT_PARTITION__",
+
/*legacy_partition_name_enabled=*/true, pool));
+ std::map<std::string, std::string> partition_map = {
+ {"f0", "true"},
+ {"f1", "10"},
+ {"f2", "-20"},
+ {"f3", "1556"},
+ {"f4", "-2556"},
+ {"f5", "348489"},
+ {"f6", "-448489"},
+ {"f7", "-9223372036854775808"},
+ {"f8", "182737474"},
+ {"f9", "0.334"},
+ {"f10", "467.66472"},
+ {"f11", " "},
+ {"f12", "__DEFAULT_PARTITION__"},
+ {"f13", "5"},
+ };
+ ASSERT_OK_AND_ASSIGN(BinaryRow row,
computer->ToBinaryRow(partition_map));
+ ASSERT_EQ(14, row.GetFieldCount());
+ ASSERT_EQ(true, row.GetBoolean(0));
+ ASSERT_EQ(-20, row.GetByte(1));
+ ASSERT_EQ(10, row.GetByte(2));
+ ASSERT_EQ(1556, row.GetShort(3));
+ ASSERT_EQ(-2556, row.GetShort(4));
+ ASSERT_EQ(348489, row.GetInt(5));
+ ASSERT_EQ(-448489, row.GetInt(6));
+ ASSERT_EQ(std::numeric_limits<int64_t>::min(), row.GetLong(7));
+ ASSERT_EQ(182737474l, row.GetLong(8));
+ ASSERT_NEAR(0.334, row.GetFloat(9), 0.0000001);
+ ASSERT_NEAR(467.66472, row.GetDouble(10), 0.0000001);
+ ASSERT_EQ(" ", row.GetString(11).ToString());
+ ASSERT_TRUE(row.IsNullAt(12));
+ ASSERT_EQ(5, row.GetInt(13));
+
+ std::vector<std::pair<std::string, std::string>> part_values;
+ ASSERT_OK_AND_ASSIGN(part_values,
computer->GeneratePartitionVector(row));
+ ASSERT_EQ(14, part_values.size());
+ std::map<std::string, std::string> actual_part_values_map;
+ for (const auto& [key, value] : part_values) {
+ actual_part_values_map[key] = value;
+ }
+ std::map<std::string, std::string> expected_map = {
+ {"f0", "true"},
+ {"f1", "10"},
+ {"f2", "-20"},
+ {"f3", "1556"},
+ {"f4", "-2556"},
+ {"f5", "348489"},
+ {"f6", "-448489"},
+ {"f7", "-9223372036854775808"},
+ {"f8", "182737474"},
+ {"f9", "0.334"},
+ {"f10", "467.66472"},
+ {"f11", "__DEFAULT_PARTITION__"},
+ {"f12", "__DEFAULT_PARTITION__"},
+ {"f13", "5"},
+ };
+ ASSERT_EQ(actual_part_values_map, expected_map);
+ }
+ {
+ // test partition_str does not contain all partition keys, f4
+ ASSERT_OK_AND_ASSIGN(
+ std::unique_ptr<BinaryRowPartitionComputer> computer,
+ BinaryRowPartitionComputer::Create(partition_keys, schema,
"__DEFAULT_PARTITION__",
+
/*legacy_partition_name_enabled=*/true, pool));
+ std::map<std::string, std::string> partition_map = {{"f0", "true"},
+ {"f1", "10"},
+ {"f2", "-20"},
+ {"f3", "1556"},
+ {"f5", "348489"},
+ {"f6", "-448489"},
+ {"f7",
"-9223372036854775808"},
+ {"f8",
"182737474"},
+ {"f9", "0.334"},
+ {"f10",
"467.66472"},
+ {"f11", "abcde"},
+ {"f12",
"这是一个很长很长的中文"}};
+
+ ASSERT_NOK_WITH_MSG(computer->ToBinaryRow(partition_map),
+ "can not find partition key 'f4' in input
partition");
+ }
+ {
+ // test partition_str mismatches schema, f6="abcd"
+ ASSERT_OK_AND_ASSIGN(
+ std::unique_ptr<BinaryRowPartitionComputer> computer,
+ BinaryRowPartitionComputer::Create(partition_keys, schema,
"__DEFAULT_PARTITION__",
+
/*legacy_partition_name_enabled=*/true, pool));
+
+ std::map<std::string, std::string> partition_map = {{"f0", "true"},
+ {"f1", "10"},
+ {"f2", "-20"},
+ {"f3", "1556"},
+ {"f4", "-2556"},
+ {"f5", "348489"},
+ {"f6", "abcd"},
+ {"f7",
"-9223372036854775808"},
+ {"f8",
"182737474"},
+ {"f9", "0.334"},
+ {"f10",
"467.66472"},
+ {"f11", "abcde"},
+ {"f12",
"这是一个很长很长的中文"}};
+ ASSERT_NOK_WITH_MSG(computer->ToBinaryRow(partition_map),
+ "cannot convert field idx 6, field value abcd to
type INT32");
+ }
+}
+
+TEST(BinaryRowPartitionComputerTest, TestNullOrWhitespaceOnlyStr) {
+ auto pool = GetDefaultPool();
+ arrow::FieldVector fields = {
+ arrow::field("f0", arrow::utf8()),
+ arrow::field("f1", arrow::utf8()),
+ arrow::field("f2", arrow::utf8()),
+ };
+
+ auto schema = arrow::schema(fields);
+ std::vector<std::string> partition_keys = {"f0", "f1", "f2"};
+ ASSERT_OK_AND_ASSIGN(
+ std::unique_ptr<BinaryRowPartitionComputer> computer,
+ BinaryRowPartitionComputer::Create(partition_keys, schema,
"__DEFAULT_PARTITION__",
+
/*legacy_partition_name_enabled=*/true, pool));
+
+ ASSERT_OK_AND_ASSIGN(auto partition_key_values,
+
computer->GeneratePartitionVector(BinaryRowGenerator::GenerateRow(
+ {std::string(" "), std::string(""),
std::string("ab ")}, pool.get())));
+ std::vector<std::pair<std::string, std::string>> expected = {
+ {"f0", "__DEFAULT_PARTITION__"}, {"f1", "__DEFAULT_PARTITION__"},
{"f2", "ab "}};
+ ASSERT_EQ(partition_key_values, expected);
+}
+
+TEST(BinaryRowPartitionComputerTest, TestPartToSimpleString) {
+ auto pool = GetDefaultPool();
+ {
+ auto schema = arrow::schema({});
+ auto partition = BinaryRow::EmptyRow();
+ ASSERT_OK_AND_ASSIGN(std::string ret,
BinaryRowPartitionComputer::PartToSimpleString(
+ schema, partition, "-", 30));
+ ASSERT_EQ(ret, "");
+ }
+ {
+ auto schema = arrow::schema({
+ arrow::field("f0", arrow::utf8()),
+ arrow::field("f1", arrow::int32()),
+ });
+ auto partition =
BinaryRowGenerator::GenerateRow({std::string("20240731"), 10}, pool.get());
+ ASSERT_OK_AND_ASSIGN(std::string ret,
BinaryRowPartitionComputer::PartToSimpleString(
+ schema, partition, "-", 30));
+ ASSERT_EQ(ret, "20240731-10");
+ }
+ {
+ auto schema = arrow::schema({
+ arrow::field("f0", arrow::utf8()),
+ arrow::field("f1", arrow::int32()),
+ });
+ auto partition = BinaryRowGenerator::GenerateRow({NullType(), 10},
pool.get());
+ ASSERT_OK_AND_ASSIGN(std::string ret,
BinaryRowPartitionComputer::PartToSimpleString(
+ schema, partition, "-", 30));
+ ASSERT_EQ(ret, "null-10");
+ }
+ {
+ auto schema = arrow::schema({
+ arrow::field("f0", arrow::utf8()),
+ arrow::field("f1", arrow::int32()),
+ });
+ auto partition =
BinaryRowGenerator::GenerateRow({std::string("20240731"), 10}, pool.get());
+ ASSERT_OK_AND_ASSIGN(std::string ret,
BinaryRowPartitionComputer::PartToSimpleString(
+ schema, partition, "-", 5));
+ ASSERT_EQ(ret, "20240");
+ }
+}
+} // namespace paimon::test
diff --git a/src/paimon/common/utils/fields_comparator.cpp
b/src/paimon/common/utils/fields_comparator.cpp
new file mode 100644
index 0000000..54a5cfd
--- /dev/null
+++ b/src/paimon/common/utils/fields_comparator.cpp
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/utils/fields_comparator.h"
+
+#include <cstddef>
+#include <string>
+
+#include "arrow/api.h"
+#include "arrow/util/checked_cast.h"
+#include "fmt/format.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/status.h"
+
+namespace paimon {
+Result<std::unique_ptr<FieldsComparator>> FieldsComparator::Create(
+ const std::vector<DataField>& input_data_field, bool is_ascending_order) {
+ std::vector<int32_t> sort_fields;
+ sort_fields.reserve(input_data_field.size());
+ for (int32_t i = 0; i < static_cast<int32_t>(input_data_field.size());
i++) {
+ sort_fields.push_back(i);
+ }
+ return Create(input_data_field, sort_fields, is_ascending_order);
+}
+
+Result<std::unique_ptr<FieldsComparator>> FieldsComparator::Create(
+ const std::vector<DataField>& input_data_field, const
std::vector<int32_t>& sort_fields,
+ bool is_ascending_order) {
+ std::vector<FieldComparatorFunc> comparators;
+ comparators.reserve(sort_fields.size());
+ for (const auto& sort_field_idx : sort_fields) {
+ const auto& type = input_data_field[sort_field_idx].Type();
+ PAIMON_ASSIGN_OR_RAISE(FieldComparatorFunc cmp,
CompareField(sort_field_idx, type));
+ comparators.emplace_back(cmp);
+ }
+ return std::unique_ptr<FieldsComparator>(
+ new FieldsComparator(is_ascending_order, sort_fields,
std::move(comparators)));
+}
+
+int32_t FieldsComparator::CompareTo(const InternalRow& lhs, const InternalRow&
rhs) const {
+ // in default comparator, null is first (not smallest)
+ int32_t null_is_last_ret = -1;
+ for (size_t i = 0; i < sort_fields_.size(); i++) {
+ bool lhs_null = lhs.IsNullAt(sort_fields_[i]);
+ bool rhs_null = rhs.IsNullAt(sort_fields_[i]);
+ if (lhs_null && rhs_null) {
+ // Continue to compare the next element
+ } else if (lhs_null) {
+ return null_is_last_ret;
+ } else if (rhs_null) {
+ return -null_is_last_ret;
+ } else {
+ int32_t comp = comparators_[i](lhs, rhs);
+ if (comp != 0) {
+ return is_ascending_order_ ? comp : -comp;
+ }
+ }
+ }
+ return 0;
+}
+
+Result<FieldsComparator::FieldComparatorFunc> FieldsComparator::CompareField(
+ int32_t field_idx, const std::shared_ptr<arrow::DataType>& input_type) {
+ arrow::Type::type type = input_type->id();
+ switch (type) {
+ case arrow::Type::type::BOOL:
+ return FieldsComparator::FieldComparatorFunc(
+ [field_idx](const InternalRow& lhs, const InternalRow& rhs) ->
int32_t {
+ bool lvalue = lhs.GetBoolean(field_idx);
+ bool rvalue = rhs.GetBoolean(field_idx);
+ return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1);
+ });
+ case arrow::Type::type::INT8:
+ return FieldsComparator::FieldComparatorFunc(
+ [field_idx](const InternalRow& lhs, const InternalRow& rhs) ->
int32_t {
+ int8_t lvalue = lhs.GetByte(field_idx);
+ int8_t rvalue = rhs.GetByte(field_idx);
+ return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1);
+ });
+ case arrow::Type::type::INT16:
+ return FieldsComparator::FieldComparatorFunc(
+ [field_idx](const InternalRow& lhs, const InternalRow& rhs) ->
int32_t {
+ int16_t lvalue = lhs.GetShort(field_idx);
+ int16_t rvalue = rhs.GetShort(field_idx);
+ return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1);
+ });
+ case arrow::Type::type::DATE32:
+ return FieldsComparator::FieldComparatorFunc(
+ [field_idx](const InternalRow& lhs, const InternalRow& rhs) ->
int32_t {
+ int32_t lvalue = lhs.GetDate(field_idx);
+ int32_t rvalue = rhs.GetDate(field_idx);
+ return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1);
+ });
+
+ case arrow::Type::type::INT32:
+ return FieldsComparator::FieldComparatorFunc(
+ [field_idx](const InternalRow& lhs, const InternalRow& rhs) ->
int32_t {
+ int32_t lvalue = lhs.GetInt(field_idx);
+ int32_t rvalue = rhs.GetInt(field_idx);
+ return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1);
+ });
+ case arrow::Type::type::INT64:
+ return FieldsComparator::FieldComparatorFunc(
+ [field_idx](const InternalRow& lhs, const InternalRow& rhs) ->
int32_t {
+ int64_t lvalue = lhs.GetLong(field_idx);
+ int64_t rvalue = rhs.GetLong(field_idx);
+ return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1);
+ });
+ case arrow::Type::type::FLOAT:
+ // TODO(xinyu.lxy):
+ // currently in java KeyComparatorSupplier: -inf < -0.0 == +0.0 <
+inf = nan
+ // paimon-cpp: -inf < -0.0 == +0.0 < +inf and nan cannot be
compared
+ return FieldsComparator::FieldComparatorFunc(
+ [field_idx](const InternalRow& lhs, const InternalRow& rhs) ->
int32_t {
+ float lvalue = lhs.GetFloat(field_idx);
+ float rvalue = rhs.GetFloat(field_idx);
+ return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1);
+ });
+ case arrow::Type::type::DOUBLE:
+ return FieldsComparator::FieldComparatorFunc(
+ [field_idx](const InternalRow& lhs, const InternalRow& rhs) ->
int32_t {
+ double lvalue = lhs.GetDouble(field_idx);
+ double rvalue = rhs.GetDouble(field_idx);
+ return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1);
+ });
+ case arrow::Type::type::STRING:
+ case arrow::Type::type::BINARY: {
+ return FieldsComparator::FieldComparatorFunc(
+ [field_idx](const InternalRow& lhs, const InternalRow& rhs) ->
int32_t {
+ auto lvalue = lhs.GetStringView(field_idx);
+ auto rvalue = rhs.GetStringView(field_idx);
+ int32_t cmp = lvalue.compare(rvalue);
+ return cmp == 0 ? 0 : (cmp > 0 ? 1 : -1);
+ });
+ }
+ case arrow::Type::type::TIMESTAMP: {
+ auto timestamp_type =
+
arrow::internal::checked_pointer_cast<arrow::TimestampType>(input_type);
+ assert(timestamp_type);
+ int32_t precision =
DateTimeUtils::GetPrecisionFromType(timestamp_type);
+ return FieldsComparator::FieldComparatorFunc(
+ [field_idx, precision](const InternalRow& lhs, const
InternalRow& rhs) -> int32_t {
+ Timestamp lvalue = lhs.GetTimestamp(field_idx, precision);
+ Timestamp rvalue = rhs.GetTimestamp(field_idx, precision);
+ return lvalue == rvalue ? 0 : (lvalue < rvalue ? -1 : 1);
+ });
+ }
+ case arrow::Type::type::DECIMAL: {
+ auto* decimal_type =
+
arrow::internal::checked_cast<arrow::Decimal128Type*>(input_type.get());
+ assert(decimal_type);
+ auto precision = decimal_type->precision();
+ auto scale = decimal_type->scale();
+ return FieldsComparator::FieldComparatorFunc(
+ [field_idx, precision, scale](const InternalRow& lhs,
+ const InternalRow& rhs) ->
int32_t {
+ Decimal lvalue = lhs.GetDecimal(field_idx, precision,
scale);
+ Decimal rvalue = rhs.GetDecimal(field_idx, precision,
scale);
+ int32_t cmp = lvalue.CompareTo(rvalue);
+ return cmp == 0 ? 0 : (cmp > 0 ? 1 : -1);
+ });
+ }
+ default:
+ return Status::NotImplemented(fmt::format("Do not support
comparing {} type in idx {}",
+ input_type->ToString(),
field_idx));
+ }
+}
+} // namespace paimon
diff --git a/src/paimon/common/utils/fields_comparator.h
b/src/paimon/common/utils/fields_comparator.h
new file mode 100644
index 0000000..7dbeca1
--- /dev/null
+++ b/src/paimon/common/utils/fields_comparator.h
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "paimon/common/data/internal_row.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/result.h"
+
+namespace arrow {
+class DataType;
+} // namespace arrow
+
+namespace paimon {
+class DataField;
+
+/// A `Comparator` that compares the file store key.
+class FieldsComparator {
+ public:
+ static Result<std::unique_ptr<FieldsComparator>> Create(
+ const std::vector<DataField>& input_data_field, bool
is_ascending_order);
+
+ static Result<std::unique_ptr<FieldsComparator>> Create(
+ const std::vector<DataField>& input_data_field, const
std::vector<int32_t>& sort_fields,
+ bool is_ascending_order);
+
+ int32_t CompareTo(const InternalRow& lhs, const InternalRow& rhs) const;
+
+ const std::vector<int32_t>& CompareFields() const {
+ return sort_fields_;
+ }
+
+ /// Java-compatible ordering for floating-point types:
+ /// -infinity < -0.0 < +0.0 < +infinity < NaN == NaN
+ /// for range index and sst key comparator
+ template <typename T>
+ static int32_t CompareFloatingPoint(T a, T b) {
+ const bool a_nan = std::isnan(a);
+ const bool b_nan = std::isnan(b);
+ if (a_nan && b_nan) {
+ return 0;
+ }
+ if (a_nan) {
+ return 1;
+ }
+ if (b_nan) {
+ return -1;
+ }
+ if (a == b) {
+ const bool a_neg = std::signbit(a);
+ const bool b_neg = std::signbit(b);
+ if (a_neg == b_neg) {
+ return 0;
+ }
+ return a_neg ? -1 : 1; // -0.0 < +0.0
+ }
+ return a < b ? -1 : 1;
+ }
+
+ private:
+ using FieldComparatorFunc =
+ std::function<int32_t(const InternalRow& lhs, const InternalRow& rhs)>;
+
+ FieldsComparator(bool is_ascending_order, const std::vector<int32_t>&
sort_fields,
+ std::vector<FieldComparatorFunc>&& comparators)
+ : is_ascending_order_(is_ascending_order),
+ sort_fields_(sort_fields),
+ comparators_(std::move(comparators)) {
+ assert(comparators_.size() == sort_fields_.size());
+ }
+
+ static Result<FieldComparatorFunc> CompareField(
+ int32_t field_idx, const std::shared_ptr<arrow::DataType>& input_type);
+
+ private:
+ bool is_ascending_order_;
+ std::vector<int32_t> sort_fields_;
+ std::vector<FieldComparatorFunc> comparators_;
+};
+} // namespace paimon
diff --git a/src/paimon/common/utils/fields_comparator_test.cpp
b/src/paimon/common/utils/fields_comparator_test.cpp
new file mode 100644
index 0000000..2f64f81
--- /dev/null
+++ b/src/paimon/common/utils/fields_comparator_test.cpp
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/utils/fields_comparator.h"
+
+#include <cstddef>
+#include <string>
+#include <variant>
+
+#include "arrow/api.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/data/data_define.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/common/utils/decimal_utils.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/binary_row_generator.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class FieldsComparatorTest : public ::testing::Test {
+ public:
+ void SetUp() override {}
+ void TearDown() override {}
+
+ void CheckResult(const InternalRow& row1, const InternalRow& row2,
+ const std::vector<std::shared_ptr<arrow::DataType>>&
input_types,
+ const std::vector<int32_t>& sort_fields, bool has_null =
false) {
+ std::vector<DataField> data_fields;
+ data_fields.reserve(input_types.size());
+ for (const auto& type : input_types) {
+ data_fields.emplace_back(/*id=*/0, arrow::field("fake_name",
type));
+ }
+ ASSERT_OK_AND_ASSIGN(auto comp1, FieldsComparator::Create(data_fields,
sort_fields,
+
/*is_ascending_order=*/true));
+ ASSERT_EQ(-1, comp1->CompareTo(row1, row2));
+ ASSERT_EQ(1, comp1->CompareTo(row2, row1));
+ ASSERT_EQ(0, comp1->CompareTo(row1, row1));
+ ASSERT_EQ(0, comp1->CompareTo(row2, row2));
+
+ if (!has_null) {
+ ASSERT_OK_AND_ASSIGN(auto comp2,
+ FieldsComparator::Create(data_fields,
sort_fields,
+
/*is_ascending_order=*/false));
+ ASSERT_EQ(1, comp2->CompareTo(row1, row2));
+ ASSERT_EQ(-1, comp2->CompareTo(row2, row1));
+ ASSERT_EQ(0, comp2->CompareTo(row1, row1));
+ ASSERT_EQ(0, comp2->CompareTo(row2, row2));
+ }
+ }
+
+ void CheckResult(const InternalRow& row1, const InternalRow& row2,
+ const std::vector<std::shared_ptr<arrow::DataType>>&
input_types,
+ bool has_null = false) {
+ std::vector<int32_t> sort_fields;
+ for (size_t i = 0; i < input_types.size(); ++i) {
+ sort_fields.push_back(i);
+ }
+ CheckResult(row1, row2, input_types, sort_fields, has_null);
+ }
+};
+
+TEST_F(FieldsComparatorTest, TestSimple) {
+ auto pool = GetDefaultPool();
+ {
+ BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+ {false, static_cast<int8_t>(10), static_cast<int16_t>(100),
+ static_cast<int32_t>(100000)},
+ pool.get());
+ BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+ {true, static_cast<int8_t>(10), static_cast<int16_t>(100),
+ static_cast<int32_t>(100000)},
+ pool.get());
+ CheckResult(row1, row2, {arrow::boolean(), arrow::int8(),
arrow::int16(), arrow::int32()});
+ }
+ {
+ BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+ {true, static_cast<int8_t>(10), static_cast<int16_t>(100),
+ static_cast<int32_t>(100000)},
+ pool.get());
+ BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+ {true, static_cast<int8_t>(20), static_cast<int16_t>(100),
+ static_cast<int32_t>(100000)},
+ pool.get());
+ CheckResult(row1, row2, {arrow::boolean(), arrow::int8(),
arrow::int16(), arrow::int32()});
+ }
+ {
+ BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+ {true, static_cast<int8_t>(10), static_cast<int16_t>(100),
+ static_cast<int32_t>(100000)},
+ pool.get());
+ BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+ {true, static_cast<int8_t>(10), static_cast<int16_t>(100),
+ static_cast<int32_t>(200000)},
+ pool.get());
+ CheckResult(row1, row2, {arrow::boolean(), arrow::int8(),
arrow::int16(), arrow::int32()});
+ }
+ {
+ BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+ {static_cast<int64_t>(9223372036854775800),
static_cast<float>(100.1), 123.45},
+ pool.get());
+ BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+ {static_cast<int64_t>(9223372036854775801),
static_cast<float>(100.1), 123.45},
+ pool.get());
+ CheckResult(row1, row2, {arrow::int64(), arrow::float32(),
arrow::float64()});
+ }
+ {
+ BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+ {static_cast<int64_t>(9223372036854775800),
static_cast<float>(100.1), 123.45},
+ pool.get());
+ BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+ {static_cast<int64_t>(9223372036854775800),
static_cast<float>(100.4), 123.45},
+ pool.get());
+ CheckResult(row1, row2, {arrow::int64(), arrow::float32(),
arrow::float64()});
+ }
+ {
+ BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+ {static_cast<int64_t>(9223372036854775800),
static_cast<float>(100.1), 1.237E+2},
+ pool.get());
+ BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+ {static_cast<int64_t>(9223372036854775800),
static_cast<float>(100.1), 1.238E+2},
+ pool.get());
+ CheckResult(row1, row2, {arrow::int64(), arrow::float32(),
arrow::float64()});
+ }
+ {
+ auto bytes = std::make_shared<Bytes>("快乐每一天", pool.get());
+ BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+ {std::string("abandon"), bytes,
TimestampType(Timestamp(1725875365442l, 120000), 9)},
+ pool.get());
+ BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+ {std::string("abandon-"), bytes,
TimestampType(Timestamp(1725875365442l, 120000), 9)},
+ pool.get());
+ CheckResult(row1, row2,
+ {arrow::utf8(), arrow::binary(),
arrow::timestamp(arrow::TimeUnit::NANO)});
+ }
+ {
+ auto bytes1 = std::make_shared<Bytes>("快乐每一天", pool.get());
+ auto bytes2 = std::make_shared<Bytes>("快乐每一天!", pool.get());
+ BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+ {std::string("abandon"), bytes1,
TimestampType(Timestamp(1725875365442l, 120000), 9)},
+ pool.get());
+ BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+ {std::string("abandon"), bytes2,
TimestampType(Timestamp(1725875365442l, 120000), 9)},
+ pool.get());
+ CheckResult(row1, row2,
+ {arrow::utf8(), arrow::binary(),
arrow::timestamp(arrow::TimeUnit::NANO)});
+ }
+ {
+ auto bytes = std::make_shared<Bytes>("快乐每一天", pool.get());
+ BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+ {std::string("abandon"), bytes,
TimestampType(Timestamp(1725875365442l, 120000), 9)},
+ pool.get());
+ BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+ {std::string("abandon"), bytes,
TimestampType(Timestamp(1725875365442l, 120100), 9)},
+ pool.get());
+ CheckResult(row1, row2,
+ {arrow::utf8(), arrow::binary(),
arrow::timestamp(arrow::TimeUnit::NANO)});
+ }
+ {
+ BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+ {Decimal(38, 10,
DecimalUtils::StrToInt128("12345678998765432145678").value()),
+ static_cast<int32_t>(10)},
+ pool.get());
+ BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+ {Decimal(38, 10,
DecimalUtils::StrToInt128("12345678998765432145679").value()),
+ static_cast<int32_t>(10)},
+ pool.get());
+ CheckResult(row1, row2, {arrow::decimal128(38, 10), arrow::date32()});
+ }
+ {
+ BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+ {Decimal(38, 10,
DecimalUtils::StrToInt128("12345678998765432145678").value()),
+ static_cast<int32_t>(10)},
+ pool.get());
+ BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+ {Decimal(38, 10,
DecimalUtils::StrToInt128("12345678998765432145678").value()),
+ static_cast<int32_t>(20)},
+ pool.get());
+ CheckResult(row1, row2, {arrow::decimal128(38, 10), arrow::date32()});
+ }
+}
+
+TEST_F(FieldsComparatorTest, TestTimestampType) {
+ auto pool = GetDefaultPool();
+ // test ts with different precision
+ {
+ BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+ {TimestampType(Timestamp(1000l, 0), 0),
TimestampType(Timestamp(1000l, 0), 3),
+ TimestampType(Timestamp(1000l, 1000), 6)},
+ pool.get());
+ BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+ {TimestampType(Timestamp(2000l, 0), 0),
TimestampType(Timestamp(1000l, 0), 3),
+ TimestampType(Timestamp(1000l, 1000), 6)},
+ pool.get());
+ CheckResult(
+ row1, row2,
+ {arrow::timestamp(arrow::TimeUnit::SECOND),
arrow::timestamp(arrow::TimeUnit::MILLI),
+ arrow::timestamp(arrow::TimeUnit::MICRO)});
+ }
+ {
+ BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+ {TimestampType(Timestamp(1000l, 0), 0),
TimestampType(Timestamp(1000l, 0), 3),
+ TimestampType(Timestamp(1000l, 1000), 6)},
+ pool.get());
+ BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+ {TimestampType(Timestamp(1000l, 0), 0),
TimestampType(Timestamp(1001l, 0), 3),
+ TimestampType(Timestamp(1000l, 1000), 6)},
+ pool.get());
+ CheckResult(
+ row1, row2,
+ {arrow::timestamp(arrow::TimeUnit::SECOND),
arrow::timestamp(arrow::TimeUnit::MILLI),
+ arrow::timestamp(arrow::TimeUnit::MICRO)});
+ }
+ {
+ BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+ {TimestampType(Timestamp(1000l, 0), 0),
TimestampType(Timestamp(1000l, 0), 3),
+ TimestampType(Timestamp(1000l, 1000), 6)},
+ pool.get());
+ BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+ {TimestampType(Timestamp(1000l, 0), 0),
TimestampType(Timestamp(1000l, 0), 3),
+ TimestampType(Timestamp(1000l, 2000), 6)},
+ pool.get());
+ CheckResult(
+ row1, row2,
+ {arrow::timestamp(arrow::TimeUnit::SECOND),
arrow::timestamp(arrow::TimeUnit::MILLI),
+ arrow::timestamp(arrow::TimeUnit::MICRO)});
+ }
+ {
+ auto timezone = DateTimeUtils::GetLocalTimezoneName();
+ BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+ {TimestampType(Timestamp(1000l, 0), 0),
TimestampType(Timestamp(1000l, 0), 3),
+ TimestampType(Timestamp(1000l, 1000), 6)},
+ pool.get());
+ BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+ {TimestampType(Timestamp(1000l, 0), 0),
TimestampType(Timestamp(1000l, 0), 3),
+ TimestampType(Timestamp(1000l, 2000), 6)},
+ pool.get());
+ CheckResult(row1, row2,
+ {arrow::timestamp(arrow::TimeUnit::SECOND, timezone),
+ arrow::timestamp(arrow::TimeUnit::MILLI, timezone),
+ arrow::timestamp(arrow::TimeUnit::MICRO, timezone)});
+ }
+}
+TEST_F(FieldsComparatorTest, TestNull) {
+ auto pool = GetDefaultPool();
+ {
+ BinaryRow row1 = BinaryRowGenerator::GenerateRow({false, NullType()},
pool.get());
+ BinaryRow row2 = BinaryRowGenerator::GenerateRow({false, 21},
pool.get());
+ CheckResult(row1, row2, {arrow::boolean(), arrow::int32()},
/*has_null=*/true);
+ }
+ {
+ BinaryRow row1 = BinaryRowGenerator::GenerateRow({NullType(), false},
pool.get());
+ BinaryRow row2 = BinaryRowGenerator::GenerateRow({NullType(), true},
pool.get());
+ CheckResult(row1, row2, {arrow::int32(), arrow::boolean()},
/*has_null=*/true);
+ }
+ {
+ BinaryRow row1 = BinaryRowGenerator::GenerateRow({true, NullType()},
pool.get());
+ BinaryRow row2 = BinaryRowGenerator::GenerateRow({false, 21},
pool.get());
+ CheckResult(row1, row2, {arrow::boolean(), arrow::int32()}, {1, 0},
/*has_null=*/true);
+ }
+ {
+ BinaryRow row1 = BinaryRowGenerator::GenerateRow({21, false},
pool.get());
+ BinaryRow row2 = BinaryRowGenerator::GenerateRow({NullType(), true},
pool.get());
+ CheckResult(row1, row2, {arrow::int32(), arrow::boolean()}, {1, 0},
/*has_null=*/true);
+ }
+ {
+ BinaryRow row1 = BinaryRowGenerator::GenerateRow({false, NullType()},
pool.get());
+ BinaryRow row2 = BinaryRowGenerator::GenerateRow({true, NullType()},
pool.get());
+ CheckResult(row1, row2, {arrow::int32(), arrow::boolean()}, {1, 0},
/*has_null=*/true);
+ }
+}
+
+TEST_F(FieldsComparatorTest, TestWithSortFields) {
+ auto pool = GetDefaultPool();
+ {
+ BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+ {true, static_cast<int8_t>(10), static_cast<int16_t>(200),
+ static_cast<int32_t>(100000)},
+ pool.get());
+ BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+ {false, static_cast<int8_t>(5), static_cast<int16_t>(100),
+ static_cast<int32_t>(200000)},
+ pool.get());
+ CheckResult(row1, row2, {arrow::boolean(), arrow::int8(),
arrow::int16(), arrow::int32()},
+ {3, 2});
+ }
+ {
+ BinaryRow row1 = BinaryRowGenerator::GenerateRow(
+ {true, static_cast<int8_t>(10), static_cast<int16_t>(100),
+ static_cast<int32_t>(100000)},
+ pool.get());
+ BinaryRow row2 = BinaryRowGenerator::GenerateRow(
+ {false, static_cast<int8_t>(5), static_cast<int16_t>(200),
+ static_cast<int32_t>(100000)},
+ pool.get());
+ CheckResult(row1, row2, {arrow::boolean(), arrow::int8(),
arrow::int16(), arrow::int32()},
+ {3, 2});
+ }
+}
+
+TEST_F(FieldsComparatorTest, TestInvalidType) {
+ auto map_type = arrow::map(arrow::int8(), arrow::int16());
+ ASSERT_NOK_WITH_MSG(FieldsComparator::Create({DataField(0,
arrow::field("f0", arrow::int32())),
+ DataField(1,
arrow::field("f1", map_type))},
+ /*is_ascending_order=*/true),
+ "Do not support comparing map<int8, int16> type in idx
1");
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/utils/internal_row_utils.h
b/src/paimon/common/utils/internal_row_utils.h
new file mode 100644
index 0000000..2a68724
--- /dev/null
+++ b/src/paimon/common/utils/internal_row_utils.h
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <optional>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/type.h"
+#include "paimon/common/data/binary_array.h"
+#include "paimon/common/data/binary_array_writer.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/common/data/internal_array.h"
+#include "paimon/common/data/internal_row.h"
+#include "paimon/result.h"
+
+namespace paimon {
+class MemoryPool;
+
+class InternalRowUtils {
+ public:
+ InternalRowUtils() = delete;
+ ~InternalRowUtils() = delete;
+
+ static std::vector<std::optional<std::string>> FromStringArrayData(const
InternalArray* data) {
+ std::vector<std::optional<std::string>> strs;
+ strs.reserve(data->Size());
+ for (auto i = 0; i < data->Size(); i++) {
+ if (!data->IsNullAt(i)) {
+ strs.emplace_back(data->GetString(i).ToString());
+ } else {
+ strs.emplace_back(std::nullopt);
+ }
+ }
+ return strs;
+ }
+
+ static std::vector<std::string> FromNotNullStringArrayData(const
InternalArray* data) {
+ std::vector<std::string> strs;
+ strs.reserve(data->Size());
+ for (auto i = 0; i < data->Size(); i++) {
+ strs.push_back(data->GetString(i).ToString());
+ }
+ return strs;
+ }
+
+ static BinaryArray ToStringArrayData(const
std::vector<std::optional<std::string>>& strs,
+ const std::shared_ptr<MemoryPool>&
memory_pool) {
+ BinaryArray array;
+ BinaryArrayWriter writer(&array, strs.size(), 8, memory_pool.get());
+ for (size_t i = 0; i < strs.size(); i++) {
+ if (strs[i] == std::nullopt) {
+ writer.SetNullAt(i);
+ } else {
+ writer.WriteString(i,
BinaryString::FromString(strs[i].value(), memory_pool.get()));
+ }
+ }
+ writer.Complete();
+ return array;
+ }
+
+ static BinaryArray ToNotNullStringArrayData(const
std::vector<std::string>& strs,
+ const
std::shared_ptr<MemoryPool>& memory_pool) {
+ BinaryArray array;
+ BinaryArrayWriter writer(&array, strs.size(), 8, memory_pool.get());
+ for (size_t i = 0; i < strs.size(); i++) {
+ writer.WriteString(i, BinaryString::FromString(strs[i],
memory_pool.get()));
+ }
+ writer.Complete();
+ return array;
+ }
+
+ static Result<std::vector<InternalRow::FieldGetterFunc>>
CreateFieldGetters(
+ const std::shared_ptr<arrow::Schema>& value_schema, bool use_view) {
+ std::vector<InternalRow::FieldGetterFunc> getters;
+ getters.reserve(value_schema->num_fields());
+ for (int32_t i = 0; i < value_schema->num_fields(); i++) {
+ const auto& field_type = value_schema->field(i)->type();
+ PAIMON_ASSIGN_OR_RAISE(InternalRow::FieldGetterFunc getter,
+ InternalRow::CreateFieldGetter(i,
field_type, use_view));
+ getters.emplace_back(getter);
+ }
+ return getters;
+ }
+};
+} // namespace paimon
diff --git a/src/paimon/common/utils/internal_row_utils_test.cpp
b/src/paimon/common/utils/internal_row_utils_test.cpp
new file mode 100644
index 0000000..71c97ee
--- /dev/null
+++ b/src/paimon/common/utils/internal_row_utils_test.cpp
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/utils/internal_row_utils.h"
+
+#include "gtest/gtest.h"
+#include "paimon/memory/memory_pool.h"
+
+namespace paimon::test {
+TEST(InternalRowUtilsTest, TestToAndFromNotNullStringArrayData) {
+ auto pool = GetDefaultPool();
+ std::vector<std::string> strs = {"aa", "bb", "cc", "dd"};
+ auto array = InternalRowUtils::ToNotNullStringArrayData(strs, pool);
+ auto results = InternalRowUtils::FromNotNullStringArrayData(&array);
+ ASSERT_EQ(strs, results);
+}
+
+TEST(InternalRowUtilsTest, TestToAndFromStringArrayData) {
+ auto pool = GetDefaultPool();
+ std::vector<std::optional<std::string>> strs = {"aa", std::nullopt, "bb",
"cc", "dd"};
+ auto array = InternalRowUtils::ToStringArrayData(strs, pool);
+ auto results = InternalRowUtils::FromStringArrayData(&array);
+ ASSERT_EQ(strs, results);
+}
+} // namespace paimon::test
diff --git a/src/paimon/common/utils/projected_array.h
b/src/paimon/common/utils/projected_array.h
new file mode 100644
index 0000000..f157a82
--- /dev/null
+++ b/src/paimon/common/utils/projected_array.h
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <string_view>
+#include <vector>
+
+#include "paimon/common/data/binary_string.h"
+#include "paimon/common/data/internal_array.h"
+#include "paimon/common/data/internal_map.h"
+#include "paimon/common/data/internal_row.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+class Bytes;
+class InternalMap;
+class InternalRow;
+
+/// An implementation of `InternalArray` which provides a projected view of
the underlying
+/// `InternalArray`.
+class ProjectedArray : public InternalArray {
+ public:
+ ProjectedArray(const std::shared_ptr<InternalArray>& array, const
std::vector<int32_t>& mapping)
+ : array_(array), mapping_(mapping) {
+ assert(array_);
+ }
+
+ int32_t Size() const override {
+ return mapping_.size();
+ }
+
+ bool IsNullAt(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ if (mapping_[pos] < 0) {
+ return true;
+ }
+ return array_->IsNullAt(mapping_[pos]);
+ }
+
+ bool GetBoolean(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return array_->GetBoolean(mapping_[pos]);
+ }
+
+ char GetByte(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return array_->GetByte(mapping_[pos]);
+ }
+
+ int16_t GetShort(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return array_->GetShort(mapping_[pos]);
+ }
+
+ int32_t GetInt(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return array_->GetInt(mapping_[pos]);
+ }
+
+ int32_t GetDate(int32_t pos) const override {
+ return GetInt(pos);
+ }
+
+ int64_t GetLong(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return array_->GetLong(mapping_[pos]);
+ }
+
+ float GetFloat(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return array_->GetFloat(mapping_[pos]);
+ }
+
+ double GetDouble(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return array_->GetDouble(mapping_[pos]);
+ }
+
+ BinaryString GetString(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return array_->GetString(mapping_[pos]);
+ }
+
+ std::string_view GetStringView(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return array_->GetStringView(mapping_[pos]);
+ }
+
+ Decimal GetDecimal(int32_t pos, int32_t precision, int32_t scale) const
override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return array_->GetDecimal(mapping_[pos], precision, scale);
+ }
+
+ Timestamp GetTimestamp(int32_t pos, int32_t precision) const override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return array_->GetTimestamp(mapping_[pos], precision);
+ }
+
+ std::shared_ptr<Bytes> GetBinary(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return array_->GetBinary(mapping_[pos]);
+ }
+
+ std::shared_ptr<InternalArray> GetArray(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return array_->GetArray(mapping_[pos]);
+ }
+
+ std::shared_ptr<InternalMap> GetMap(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return array_->GetMap(mapping_[pos]);
+ }
+
+ std::shared_ptr<InternalRow> GetRow(int32_t pos, int32_t num_fields) const
override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return array_->GetRow(mapping_[pos], num_fields);
+ }
+
+ Result<std::vector<char>> ToBooleanArray() const override {
+ return Status::Invalid("projected array do not support convert to
boolean array");
+ }
+ Result<std::vector<char>> ToByteArray() const override {
+ return Status::Invalid("projected array do not support convert to byte
array");
+ }
+ Result<std::vector<int16_t>> ToShortArray() const override {
+ return Status::Invalid("projected array do not support convert to
short array");
+ }
+ Result<std::vector<int32_t>> ToIntArray() const override {
+ return Status::Invalid("projected array do not support convert to int
array");
+ }
+ Result<std::vector<int64_t>> ToLongArray() const override {
+ return Status::Invalid("projected array do not support convert to long
array");
+ }
+ Result<std::vector<float>> ToFloatArray() const override {
+ return Status::Invalid("projected array do not support convert to
float array");
+ }
+ Result<std::vector<double>> ToDoubleArray() const override {
+ return Status::Invalid("projected array do not support convert to
double array");
+ }
+
+ private:
+ std::shared_ptr<InternalArray> array_;
+ std::vector<int32_t> mapping_;
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/utils/projected_array_test.cpp
b/src/paimon/common/utils/projected_array_test.cpp
new file mode 100644
index 0000000..b0dcdcf
--- /dev/null
+++ b/src/paimon/common/utils/projected_array_test.cpp
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/utils/projected_array.h"
+
+#include <algorithm>
+#include <optional>
+#include <string>
+
+#include "gtest/gtest.h"
+#include "paimon/common/data/binary_array.h"
+#include "paimon/common/data/binary_array_writer.h"
+#include "paimon/common/data/binary_map.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/binary_row_generator.h"
+
+namespace paimon::test {
+TEST(ProjectedArrayTest, TestSimple) {
+ auto pool = GetDefaultPool();
+ {
+ std::vector<bool> arr = {true, false};
+ auto array = std::make_shared<BinaryArray>();
+ BinaryArrayWriter writer =
+ BinaryArrayWriter(array.get(), arr.size(), sizeof(bool),
pool.get());
+ for (size_t i = 0; i < arr.size(); i++) {
+ writer.WriteBoolean(i, arr[i]);
+ }
+ writer.Complete();
+
+ std::vector<int32_t> mapping = {1, 0, -1};
+ ProjectedArray projected_array(array, mapping);
+ ASSERT_EQ(projected_array.GetBoolean(1), true);
+ ASSERT_EQ(projected_array.GetBoolean(0), false);
+ ASSERT_TRUE(projected_array.IsNullAt(2));
+ }
+ {
+ std::vector<int8_t> arr = {1, 2, 3, 4, 5};
+ auto array = std::make_shared<BinaryArray>();
+ BinaryArrayWriter writer =
+ BinaryArrayWriter(array.get(), arr.size(), sizeof(int8_t),
pool.get());
+ for (size_t i = 0; i < arr.size(); i++) {
+ writer.WriteByte(i, arr[i]);
+ }
+ writer.Complete();
+
+ std::vector<int32_t> mapping = {4, 3, 2, 1, 0, -1};
+ ProjectedArray projected_array(array, mapping);
+ ASSERT_EQ(projected_array.GetByte(0), 5);
+ ASSERT_EQ(projected_array.GetByte(1), 4);
+ ASSERT_EQ(projected_array.GetByte(4), 1);
+ ASSERT_TRUE(projected_array.IsNullAt(5));
+ }
+ {
+ std::vector<int16_t> arr = {1, 2, 3, 4, 5};
+ auto array = std::make_shared<BinaryArray>();
+ BinaryArrayWriter writer =
+ BinaryArrayWriter(array.get(), arr.size(), sizeof(int16_t),
pool.get());
+ for (size_t i = 0; i < arr.size(); i++) {
+ writer.WriteShort(i, arr[i]);
+ }
+ writer.Complete();
+
+ std::vector<int32_t> mapping = {4, 3, 2, 1, 0, -1};
+ ProjectedArray projected_array(array, mapping);
+ ASSERT_EQ(projected_array.GetShort(0), 5);
+ ASSERT_EQ(projected_array.GetShort(1), 4);
+ ASSERT_EQ(projected_array.GetShort(4), 1);
+ ASSERT_TRUE(projected_array.IsNullAt(5));
+ }
+ {
+ std::vector<int32_t> arr = {1, 2, 3, 4, 5};
+ auto array = std::make_shared<BinaryArray>();
+ BinaryArrayWriter writer =
+ BinaryArrayWriter(array.get(), arr.size(), sizeof(int32_t),
pool.get());
+ for (size_t i = 0; i < arr.size(); i++) {
+ writer.WriteInt(i, arr[i]);
+ }
+ writer.Complete();
+
+ std::vector<int32_t> mapping = {4, 3, 2, 1, 0, -1};
+ ProjectedArray projected_array(array, mapping);
+ ASSERT_EQ(projected_array.GetInt(0), 5);
+ ASSERT_EQ(projected_array.GetInt(1), 4);
+ ASSERT_EQ(projected_array.GetInt(4), 1);
+ ASSERT_TRUE(projected_array.IsNullAt(5));
+ }
+ {
+ // test date
+ std::vector<int32_t> arr = {10000, 20006};
+ auto array = std::make_shared<BinaryArray>();
+ BinaryArrayWriter writer =
+ BinaryArrayWriter(array.get(), arr.size(), sizeof(int32_t),
pool.get());
+ for (size_t i = 0; i < arr.size(); i++) {
+ writer.WriteInt(i, arr[i]);
+ }
+ writer.Complete();
+
+ std::vector<int32_t> mapping = {1, 0, -1};
+ ProjectedArray projected_array(array, mapping);
+ ASSERT_EQ(projected_array.GetDate(0), 20006);
+ ASSERT_EQ(projected_array.GetDate(1), 10000);
+ ASSERT_TRUE(projected_array.IsNullAt(2));
+ }
+ {
+ std::vector<float> arr = {1.0, 2.0};
+ auto array = std::make_shared<BinaryArray>();
+ BinaryArrayWriter writer =
+ BinaryArrayWriter(array.get(), arr.size(), sizeof(float),
pool.get());
+ for (size_t i = 0; i < arr.size(); i++) {
+ writer.WriteFloat(i, arr[i]);
+ }
+ writer.Complete();
+
+ std::vector<int32_t> mapping = {1, 0, -1};
+ ProjectedArray projected_array(array, mapping);
+ ASSERT_EQ(projected_array.GetFloat(0), 2.0);
+ ASSERT_EQ(projected_array.GetFloat(1), 1.0);
+ ASSERT_TRUE(projected_array.IsNullAt(2));
+ }
+ {
+ std::vector<double> arr = {1.0, 2.0};
+ auto array = std::make_shared<BinaryArray>();
+ BinaryArrayWriter writer =
+ BinaryArrayWriter(array.get(), arr.size(), sizeof(double),
pool.get());
+ for (size_t i = 0; i < arr.size(); i++) {
+ writer.WriteDouble(i, arr[i]);
+ }
+ writer.Complete();
+
+ std::vector<int32_t> mapping = {1, 0, -1};
+ ProjectedArray projected_array(array, mapping);
+ ASSERT_EQ(projected_array.GetDouble(0), 2.0);
+ ASSERT_EQ(projected_array.GetDouble(1), 1.0);
+ ASSERT_TRUE(projected_array.IsNullAt(2));
+ }
+ // decimal
+ {
+ // not compact (precision <= 18)
+ std::vector<Decimal> arr = {Decimal(6, 2, 123456), Decimal(6, 3,
123456)};
+ auto array = std::make_shared<BinaryArray>();
+ BinaryArrayWriter writer =
+ BinaryArrayWriter(array.get(), arr.size(), /*element_size=*/8,
pool.get());
+ for (size_t i = 0; i < arr.size(); i++) {
+ writer.WriteDecimal(i, arr[i], 6);
+ }
+ writer.Complete();
+
+ std::vector<int32_t> mapping = {1, 0, -1};
+ ProjectedArray projected_array(array, mapping);
+ ASSERT_EQ(projected_array.GetDecimal(0, 6, 3), Decimal(6, 3, 123456));
+ ASSERT_EQ(projected_array.GetDecimal(1, 6, 2), Decimal(6, 2, 123456));
+ ASSERT_TRUE(projected_array.IsNullAt(2));
+ }
+ {
+ // compact (precision > 18)
+ std::vector<Decimal> arr = {Decimal(/*precision=*/20, /*scale=*/3,
123456),
+ Decimal(/*precision=*/20, /*scale=*/3,
123456789)};
+ auto array = std::make_shared<BinaryArray>();
+ BinaryArrayWriter writer =
+ BinaryArrayWriter(array.get(), arr.size(), /*element_size=*/8,
pool.get());
+ for (size_t i = 0; i < arr.size(); i++) {
+ writer.WriteDecimal(i, arr[i], /*precision=*/20);
+ }
+ writer.Complete();
+
+ std::vector<int32_t> mapping = {1, 0, -1};
+ ProjectedArray projected_array(array, mapping);
+ ASSERT_EQ(projected_array.GetDecimal(0, 20, 3), Decimal(20, 3,
123456789));
+ ASSERT_EQ(projected_array.GetDecimal(1, 20, 3), Decimal(20, 3,
123456));
+ ASSERT_TRUE(projected_array.IsNullAt(2));
+ }
+ // timestamp
+ {
+ std::vector<Timestamp> arr = {Timestamp(0, 0), Timestamp(12345, 1)};
+ auto array = std::make_shared<BinaryArray>();
+ BinaryArrayWriter writer =
+ BinaryArrayWriter(array.get(), arr.size(), /*element_size=*/8,
pool.get());
+ for (size_t i = 0; i < arr.size(); i++) {
+ writer.WriteTimestamp(i, arr[i], 9);
+ }
+ writer.Complete();
+
+ std::vector<int32_t> mapping = {1, 0, -1};
+ ProjectedArray projected_array(array, mapping);
+ ASSERT_EQ(projected_array.GetTimestamp(0, 9), Timestamp(12345, 1));
+ ASSERT_EQ(projected_array.GetTimestamp(1, 9), Timestamp(0, 0));
+ ASSERT_TRUE(projected_array.IsNullAt(2));
+ }
+ // binary
+ {
+ std::vector<Bytes> arr;
+ arr.emplace_back("hello", pool.get());
+ arr.emplace_back("world", pool.get());
+ auto array = std::make_shared<BinaryArray>();
+ BinaryArrayWriter writer =
+ BinaryArrayWriter(array.get(), arr.size(), /*element_size=*/8,
pool.get());
+ for (size_t i = 0; i < arr.size(); i++) {
+ writer.WriteBinary(i, arr[i]);
+ }
+ writer.Complete();
+
+ std::vector<int32_t> mapping = {1, 0, -1};
+ ProjectedArray projected_array(array, mapping);
+ ASSERT_EQ(*projected_array.GetBinary(0), arr[1]);
+ ASSERT_EQ(*projected_array.GetBinary(1), arr[0]);
+ ASSERT_TRUE(projected_array.IsNullAt(2));
+ }
+ // string
+ {
+ std::vector<BinaryString> arr;
+ arr.push_back(BinaryString::FromString("hello", pool.get()));
+ arr.push_back(BinaryString::FromString("world", pool.get()));
+ auto array = std::make_shared<BinaryArray>();
+ BinaryArrayWriter writer =
+ BinaryArrayWriter(array.get(), arr.size(), /*element_size=*/8,
pool.get());
+ for (size_t i = 0; i < arr.size(); i++) {
+ writer.WriteString(i, arr[i]);
+ }
+ writer.Complete();
+
+ std::vector<int32_t> mapping = {1, 0, -1};
+ ProjectedArray projected_array(array, mapping);
+ ASSERT_EQ(projected_array.GetString(0), arr[1]);
+ ASSERT_EQ(projected_array.GetString(1), arr[0]);
+ ASSERT_TRUE(projected_array.IsNullAt(2));
+ }
+ // array
+ {
+ // element1
+ std::vector<int32_t> arr1 = {1, 2};
+ auto array1 = std::make_shared<BinaryArray>();
+ BinaryArrayWriter writer1 =
+ BinaryArrayWriter(array1.get(), arr1.size(), sizeof(int32_t),
pool.get());
+ for (size_t i = 0; i < arr1.size(); i++) {
+ writer1.WriteInt(i, arr1[i]);
+ }
+ writer1.Complete();
+ // element2
+ std::vector<int32_t> arr2 = {100, 200};
+ auto array2 = std::make_shared<BinaryArray>();
+ BinaryArrayWriter writer2 =
+ BinaryArrayWriter(array2.get(), arr2.size(), sizeof(int32_t),
pool.get());
+ for (size_t i = 0; i < arr2.size(); i++) {
+ writer2.WriteInt(i, arr2[i]);
+ }
+ writer2.Complete();
+ // array
+ std::vector<std::shared_ptr<BinaryArray>> arr;
+ arr.push_back(array1);
+ arr.push_back(array2);
+ auto array = std::make_shared<BinaryArray>();
+ BinaryArrayWriter writer = BinaryArrayWriter(array.get(), arr.size(),
8, pool.get());
+ for (size_t i = 0; i < arr.size(); i++) {
+ writer.WriteArray(i, *arr[i]);
+ }
+ writer.Complete();
+
+ std::vector<int32_t> mapping = {1, 0, -1};
+ ProjectedArray projected_array(array, mapping);
+
+ auto ret0 =
std::dynamic_pointer_cast<BinaryArray>(projected_array.GetArray(0));
+ auto ret1 =
std::dynamic_pointer_cast<BinaryArray>(projected_array.GetArray(1));
+ ASSERT_TRUE(ret0);
+ ASSERT_TRUE(ret1);
+ ASSERT_EQ(*ret0, *arr[1]);
+ ASSERT_EQ(*ret1, *arr[0]);
+ ASSERT_TRUE(projected_array.IsNullAt(2));
+ }
+}
+
+TEST(ProjectedArrayTest, TestGetStringView) {
+ auto pool = GetDefaultPool();
+ std::vector<BinaryString> arr;
+ arr.push_back(BinaryString::FromString("hello", pool.get()));
+ arr.push_back(BinaryString::FromString("world", pool.get()));
+ auto array = std::make_shared<BinaryArray>();
+ BinaryArrayWriter writer =
+ BinaryArrayWriter(array.get(), arr.size(), /*element_size=*/8,
pool.get());
+ for (size_t i = 0; i < arr.size(); i++) {
+ writer.WriteString(i, arr[i]);
+ }
+ writer.Complete();
+
+ std::vector<int32_t> mapping = {1, 0, -1};
+ ProjectedArray projected_array(array, mapping);
+ ASSERT_EQ(projected_array.GetStringView(0), "world");
+ ASSERT_EQ(projected_array.GetStringView(1), "hello");
+ ASSERT_TRUE(projected_array.IsNullAt(2));
+}
+
+TEST(ProjectedArrayTest, TestGetMap) {
+ auto pool = GetDefaultPool();
+ auto key = BinaryArray::FromIntArray({1, 2, 3}, pool.get());
+ auto value = BinaryArray::FromLongArray({100ll, 200ll, 300ll}, pool.get());
+ auto map1 = BinaryMap::ValueOf(key, value, pool.get());
+
+ auto key2 = BinaryArray::FromIntArray({10, 20}, pool.get());
+ auto value2 = BinaryArray::FromLongArray({1000ll, 2000ll}, pool.get());
+ auto map2 = BinaryMap::ValueOf(key2, value2, pool.get());
+
+ auto array = std::make_shared<BinaryArray>();
+ BinaryArrayWriter writer =
+ BinaryArrayWriter(array.get(), /*num_elements=*/2, /*element_size=*/8,
pool.get());
+ writer.WriteMap(0, *map1);
+ writer.WriteMap(1, *map2);
+ writer.Complete();
+
+ std::vector<int32_t> mapping = {1, 0, -1};
+ ProjectedArray projected_array(array, mapping);
+
+ // mapping[0] -> original index 1 -> map2
+ auto result0 = projected_array.GetMap(0);
+ ASSERT_EQ(result0->Size(), 2);
+ ASSERT_EQ(result0->KeyArray()->ToIntArray().value(),
(std::vector<int32_t>{10, 20}));
+
+ // mapping[1] -> original index 0 -> map1
+ auto result1 = projected_array.GetMap(1);
+ ASSERT_EQ(result1->Size(), 3);
+ ASSERT_EQ(result1->KeyArray()->ToIntArray().value(),
(std::vector<int32_t>{1, 2, 3}));
+
+ ASSERT_TRUE(projected_array.IsNullAt(2));
+}
+
+TEST(ProjectedArrayTest, TestGetRow) {
+ auto pool = GetDefaultPool();
+ BinaryRow row1 = BinaryRowGenerator::GenerateRow({std::string("Alice"),
30}, pool.get());
+ BinaryRow row2 = BinaryRowGenerator::GenerateRow({std::string("Bob"), 40},
pool.get());
+
+ auto array = std::make_shared<BinaryArray>();
+ BinaryArrayWriter writer =
+ BinaryArrayWriter(array.get(), /*num_elements=*/2, /*element_size=*/8,
pool.get());
+ writer.WriteRow(0, row1);
+ writer.WriteRow(1, row2);
+ writer.Complete();
+
+ std::vector<int32_t> mapping = {1, 0, -1};
+ ProjectedArray projected_array(array, mapping);
+
+ // mapping[0] -> original index 1 -> row2
+ auto result0 =
std::dynamic_pointer_cast<BinaryRow>(projected_array.GetRow(0, 2));
+ ASSERT_TRUE(result0);
+ ASSERT_EQ(*result0, row2);
+
+ // mapping[1] -> original index 0 -> row1
+ auto result1 =
std::dynamic_pointer_cast<BinaryRow>(projected_array.GetRow(1, 2));
+ ASSERT_TRUE(result1);
+ ASSERT_EQ(*result1, row1);
+
+ ASSERT_TRUE(projected_array.IsNullAt(2));
+}
+
+} // namespace paimon::test
diff --git a/src/paimon/common/utils/projected_row.h
b/src/paimon/common/utils/projected_row.h
new file mode 100644
index 0000000..cf6e146
--- /dev/null
+++ b/src/paimon/common/utils/projected_row.h
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <string_view>
+#include <vector>
+
+#include "fmt/format.h"
+#include "fmt/ranges.h"
+#include "paimon/common/data/binary_string.h"
+#include "paimon/common/data/data_define.h"
+#include "paimon/common/data/internal_row.h"
+#include "paimon/common/types/row_kind.h"
+#include "paimon/data/decimal.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/result.h"
+
+namespace paimon {
+class Bytes;
+class InternalArray;
+class InternalMap;
+
+/// An implementation of `InternalRow` which provides a projected view of the
underlying
+/// `InternalRow`.
+///
+/// Projection includes both reducing the accessible fields and reordering
them.
+///
+/// @note This class supports only top-level projections, not nested
projections.
+class ProjectedRow : public InternalRow {
+ public:
+ // e.g., mapping = [2, 1, -1],
+ // GetInt(pos = 0) return inner row->GetInt(pos = 2)
+ // -1 in mapping indicates null, IsNullAt(pos = 2) return true
+ ProjectedRow(const std::shared_ptr<InternalRow>& row, const
std::vector<int32_t>& mapping)
+ : row_(row), mapping_(mapping) {
+ assert(row_);
+ }
+
+ int32_t GetFieldCount() const override {
+ return mapping_.size();
+ }
+
+ Result<const RowKind*> GetRowKind() const override {
+ return row_->GetRowKind();
+ }
+
+ void SetRowKind(const RowKind* kind) override {
+ row_->SetRowKind(kind);
+ }
+
+ bool IsNullAt(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ if (mapping_[pos] < 0) {
+ return true;
+ }
+ return row_->IsNullAt(mapping_[pos]);
+ }
+
+ bool GetBoolean(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return row_->GetBoolean(mapping_[pos]);
+ }
+
+ char GetByte(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return row_->GetByte(mapping_[pos]);
+ }
+
+ int16_t GetShort(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return row_->GetShort(mapping_[pos]);
+ }
+
+ int32_t GetInt(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return row_->GetInt(mapping_[pos]);
+ }
+
+ int32_t GetDate(int32_t pos) const override {
+ return GetInt(pos);
+ }
+
+ int64_t GetLong(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return row_->GetLong(mapping_[pos]);
+ }
+
+ float GetFloat(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return row_->GetFloat(mapping_[pos]);
+ }
+
+ double GetDouble(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return row_->GetDouble(mapping_[pos]);
+ }
+
+ BinaryString GetString(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return row_->GetString(mapping_[pos]);
+ }
+
+ std::string_view GetStringView(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return row_->GetStringView(mapping_[pos]);
+ }
+
+ Decimal GetDecimal(int32_t pos, int32_t precision, int32_t scale) const
override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return row_->GetDecimal(mapping_[pos], precision, scale);
+ }
+
+ Timestamp GetTimestamp(int32_t pos, int32_t precision) const override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return row_->GetTimestamp(mapping_[pos], precision);
+ }
+
+ std::shared_ptr<Bytes> GetBinary(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return row_->GetBinary(mapping_[pos]);
+ }
+
+ std::shared_ptr<InternalArray> GetArray(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return row_->GetArray(mapping_[pos]);
+ }
+
+ std::shared_ptr<InternalMap> GetMap(int32_t pos) const override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return row_->GetMap(mapping_[pos]);
+ }
+
+ std::shared_ptr<InternalRow> GetRow(int32_t pos, int32_t num_fields) const
override {
+ assert(static_cast<size_t>(pos) < mapping_.size());
+ return row_->GetRow(mapping_[pos], num_fields);
+ }
+
+ std::string ToString() const override {
+ auto row_kind = row_->GetRowKind();
+ std::string row_kind_str =
+ row_kind.ok() ? row_kind.value()->ShortString() : "unknown row
kind";
+ return fmt::format("{} {{ indexMapping={}, mutableRow={} }}",
row_kind_str,
+ fmt::join(mapping_, ", "), row_->ToString());
+ }
+
+ private:
+ std::shared_ptr<InternalRow> row_;
+ std::vector<int32_t> mapping_;
+};
+
+} // namespace paimon
diff --git a/src/paimon/common/utils/projected_row_test.cpp
b/src/paimon/common/utils/projected_row_test.cpp
new file mode 100644
index 0000000..ade7782
--- /dev/null
+++ b/src/paimon/common/utils/projected_row_test.cpp
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/utils/projected_row.h"
+
+#include <utility>
+#include <variant>
+
+#include "arrow/api.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/common/data/binary_array.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/data/columnar/columnar_map.h"
+#include "paimon/common/data/generic_row.h"
+#include "paimon/common/data/internal_array.h"
+#include "paimon/common/data/internal_map.h"
+#include "paimon/common/utils/decimal_utils.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/binary_row_generator.h"
+
+namespace paimon::test {
+TEST(ProjectedRowTest, TestSimple) {
+ auto pool = GetDefaultPool();
+ auto row = std::make_shared<GenericRow>(16);
+ row->SetField(0, true);
+ row->SetField(1, static_cast<char>(1));
+ row->SetField(2, static_cast<int16_t>(2));
+ row->SetField(3, static_cast<int32_t>(3));
+ row->SetField(4, static_cast<int64_t>(4));
+ row->SetField(5, static_cast<float>(5.1));
+ row->SetField(6, 6.12);
+ auto str = BinaryString::FromString("abcd", pool.get());
+ row->SetField(7, str);
+ std::shared_ptr<Bytes> bytes = Bytes::AllocateBytes("efgh", pool.get());
+ row->SetField(8, bytes);
+ std::string str9 = "apple";
+ row->SetField(9, std::string_view(str9.data(), str9.size()));
+
+ Timestamp ts(100, 20);
+ row->SetField(10, ts);
+ Decimal decimal(/*precision=*/30, /*scale=*/20,
+
DecimalUtils::StrToInt128("12345678998765432145678").value());
+ row->SetField(11, decimal);
+
+ auto array = std::make_shared<BinaryArray>(BinaryArray::FromLongArray(
+ {static_cast<int64_t>(10), static_cast<int64_t>(20)}, pool.get()));
+ row->SetField(12, array);
+
+ std::shared_ptr<InternalRow> binary_row =
+ BinaryRowGenerator::GenerateRowPtr({100, 200}, pool.get());
+ row->SetField(13, binary_row);
+
+ auto key = arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), "[1,
2, 3]").ValueOrDie();
+ auto value =
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::int64(), "[2, 4,
6]").ValueOrDie();
+ auto map = std::make_shared<ColumnarMap>(key, value, pool, /*offset=*/0,
/*length=*/3);
+ row->SetField(14, map);
+ // do not set value at pos 15, therefore, pos 15 is null
+
+ std::vector<int32_t> mapping = {15, 14, 13, 12, 11, 10, 9, 8, 7, 6,
+ 5, 4, 3, 2, 1, 0, -1, -1, -1, -1};
+
+ ProjectedRow projected_row(row, mapping);
+
+ // test row kind
+ ASSERT_EQ(projected_row.GetRowKind().value(), RowKind::Insert());
+ projected_row.SetRowKind(RowKind::Delete());
+ ASSERT_EQ(projected_row.GetRowKind().value(), RowKind::Delete());
+ ASSERT_EQ(row->GetRowKind().value(), RowKind::Delete());
+
+ ASSERT_EQ(projected_row.GetFieldCount(), 20);
+
+ ASSERT_TRUE(projected_row.IsNullAt(19));
+ ASSERT_TRUE(projected_row.IsNullAt(18));
+ ASSERT_TRUE(projected_row.IsNullAt(17));
+ ASSERT_TRUE(projected_row.IsNullAt(16));
+
+ ASSERT_EQ(projected_row.GetBoolean(15), true);
+ ASSERT_EQ(projected_row.GetByte(14), static_cast<char>(1));
+ ASSERT_EQ(projected_row.GetShort(13), static_cast<int16_t>(2));
+ ASSERT_EQ(projected_row.GetInt(12), static_cast<int32_t>(3));
+ ASSERT_EQ(projected_row.GetDate(12), static_cast<int32_t>(3));
+ ASSERT_EQ(projected_row.GetLong(11), static_cast<int64_t>(4));
+ ASSERT_EQ(projected_row.GetFloat(10), static_cast<float>(5.1));
+ ASSERT_EQ(projected_row.GetDouble(9), static_cast<double>(6.12));
+ ASSERT_EQ(projected_row.GetString(8), str);
+ ASSERT_EQ(*projected_row.GetBinary(7), *bytes);
+ ASSERT_EQ(std::string(projected_row.GetStringView(6)), str9);
+ ASSERT_EQ(projected_row.GetTimestamp(5, /*precision=*/9), ts);
+ ASSERT_EQ(projected_row.GetDecimal(4, /*precision=*/30, /*scale=*/20),
decimal);
+ ASSERT_EQ(projected_row.GetArray(3)->ToLongArray().value(),
array->ToLongArray().value());
+
+ auto binary_row_result =
std::dynamic_pointer_cast<BinaryRow>(projected_row.GetRow(2, 2));
+ auto binary_row_expected =
std::dynamic_pointer_cast<BinaryRow>(binary_row);
+ ASSERT_EQ(*binary_row_result, *binary_row_expected);
+
+ ASSERT_EQ(projected_row.GetMap(1)->KeyArray()->ToIntArray().value(),
+ map->KeyArray()->ToIntArray().value());
+ ASSERT_EQ(projected_row.GetMap(1)->ValueArray()->ToLongArray().value(),
+ map->ValueArray()->ToLongArray().value());
+
+ ASSERT_TRUE(projected_row.IsNullAt(0));
+
+ ASSERT_EQ(
+ "-D { indexMapping=15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1,
0, -1, -1, -1, -1, "
+ "mutableRow=(true,1,2,3,4,5.100000,6.120000,abcd,efgh,apple,1970-01-01
"
+ "00:00:00.100000020,123.45678998765432145678,array,row,map,null) }",
+ projected_row.ToString());
+}
+} // namespace paimon::test
diff --git a/src/paimon/common/utils/rapidjson_util.h
b/src/paimon/common/utils/rapidjson_util.h
index d413f25..7a6c440 100644
--- a/src/paimon/common/utils/rapidjson_util.h
+++ b/src/paimon/common/utils/rapidjson_util.h
@@ -1,20 +1,17 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Copyright 2024-present Alibaba Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
#pragma once
diff --git a/src/paimon/common/utils/rapidjson_util_test.cpp
b/src/paimon/common/utils/rapidjson_util_test.cpp
index c5011d9..b143653 100644
--- a/src/paimon/common/utils/rapidjson_util_test.cpp
+++ b/src/paimon/common/utils/rapidjson_util_test.cpp
@@ -1,20 +1,17 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Copyright 2024-present Alibaba Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
#include "paimon/common/utils/rapidjson_util.h"