This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 687ca03 feat(parquet): project arrow array for parquet (#165)
687ca03 is described below
commit 687ca032068a3232cc00699ec76cb49f74393354
Author: Gang Wu <[email protected]>
AuthorDate: Tue Aug 19 00:40:05 2025 +0800
feat(parquet): project arrow array for parquet (#165)
---
src/iceberg/parquet/parquet_data_util.cc | 255 +++++++++++-
src/iceberg/parquet/parquet_data_util_internal.h | 11 +-
src/iceberg/parquet/parquet_reader.cc | 11 +-
test/CMakeLists.txt | 6 +-
test/parquet_data_test.cc | 506 +++++++++++++++++++++++
5 files changed, 774 insertions(+), 15 deletions(-)
diff --git a/src/iceberg/parquet/parquet_data_util.cc
b/src/iceberg/parquet/parquet_data_util.cc
index 6237b00..c822daa 100644
--- a/src/iceberg/parquet/parquet_data_util.cc
+++ b/src/iceberg/parquet/parquet_data_util.cc
@@ -17,15 +17,266 @@
* under the License.
*/
+#include <arrow/array.h>
+#include <arrow/array/builder_primitive.h>
+#include <arrow/compute/api.h>
+#include <arrow/record_batch.h>
+#include <arrow/type.h>
+
+#include "iceberg/arrow/arrow_error_transform_internal.h"
#include "iceberg/parquet/parquet_data_util_internal.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_util.h"
+#include "iceberg/type.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
namespace iceberg::parquet {
+namespace {
+
+// Forward declaration
+Result<std::shared_ptr<::arrow::Array>> ProjectNestedArray(
+ const std::shared_ptr<::arrow::Array>& array,
+ const std::shared_ptr<::arrow::DataType>& output_arrow_type,
+ const NestedType& nested_type, std::span<const FieldProjection>
projections,
+ ::arrow::MemoryPool* pool);
+
+/// \brief Create a null array of the given type and length.
+Result<std::shared_ptr<::arrow::Array>> MakeNullArray(
+ const std::shared_ptr<::arrow::DataType>& type, int64_t length,
+ ::arrow::MemoryPool* pool) {
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(auto builder, ::arrow::MakeBuilder(type,
pool));
+ ICEBERG_ARROW_RETURN_NOT_OK(builder->AppendNulls(length));
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(auto array, builder->Finish());
+ return array;
+}
+
+Result<std::shared_ptr<::arrow::Array>> ProjectPrimitiveArray(
+ const std::shared_ptr<::arrow::Array>& array,
+ const std::shared_ptr<::arrow::DataType>& output_arrow_type,
+ ::arrow::MemoryPool* pool) {
+ if (array->type()->Equals(output_arrow_type)) {
+ return array;
+ }
+
+ // Use Arrow compute cast function for type conversions.
+ // Note: We don't check the schema evolution rule again because projecting
schemas
+ // has checked this.
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(auto cast_result,
+ ::arrow::compute::Cast(array,
output_arrow_type));
+ return cast_result.make_array();
+}
+
+Result<std::shared_ptr<::arrow::Array>> ProjectStructArray(
+ const std::shared_ptr<::arrow::StructArray>& struct_array,
+ const std::shared_ptr<::arrow::StructType>& output_struct_type,
+ const StructType& struct_type, std::span<const FieldProjection>
projections,
+ ::arrow::MemoryPool* pool) {
+ if (struct_type.fields().size() != projections.size()) {
+ return InvalidSchema(
+ "Inconsistent number of fields ({}) and number of projections ({})",
+ struct_type.fields().size(), projections.size());
+ }
+ if (struct_type.fields().size() != output_struct_type->num_fields()) {
+ return InvalidSchema(
+ "Inconsistent number of fields ({}) and number of output fields ({})",
+ struct_type.fields().size(), output_struct_type->num_fields());
+ }
+
+ std::vector<std::shared_ptr<::arrow::Array>> projected_arrays;
+ projected_arrays.reserve(projections.size());
+
+ for (size_t i = 0; i < projections.size(); ++i) {
+ const auto& projected_field = struct_type.fields()[i];
+ const auto& field_projection = projections[i];
+ const auto& output_arrow_type = output_struct_type->fields()[i]->type();
+
+ std::shared_ptr<::arrow::Array> projected_array;
+
+ if (field_projection.kind == FieldProjection::Kind::kProjected) {
+ auto parquet_field_index =
+ static_cast<int>(std::get<size_t>(field_projection.from));
+ if (parquet_field_index >= struct_array->num_fields()) {
+ return InvalidArgument("Parquet field index {} out of bound {}",
+ parquet_field_index,
struct_array->num_fields());
+ }
+ const auto& parquet_array = struct_array->field(parquet_field_index);
+ if (projected_field.type()->is_nested()) {
+ const auto& nested_type =
+ internal::checked_cast<const NestedType&>(*projected_field.type());
+ ICEBERG_ASSIGN_OR_RAISE(
+ projected_array,
+ ProjectNestedArray(parquet_array, output_arrow_type, nested_type,
+ field_projection.children, pool));
+ } else {
+ ICEBERG_ASSIGN_OR_RAISE(
+ projected_array,
+ ProjectPrimitiveArray(parquet_array, output_arrow_type, pool));
+ }
+ } else if (field_projection.kind == FieldProjection::Kind::kNull) {
+ ICEBERG_ASSIGN_OR_RAISE(
+ projected_array,
+ MakeNullArray(output_arrow_type, struct_array->length(), pool));
+ } else {
+ return NotImplemented("Unsupported field projection kind: {}",
+ ToString(field_projection.kind));
+ }
+
+ projected_arrays.emplace_back(std::move(projected_array));
+ }
+
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(
+ auto output_array,
+ ::arrow::StructArray::Make(projected_arrays,
output_struct_type->fields(),
+ struct_array->null_bitmap(),
struct_array->null_count(),
+ struct_array->offset()));
+ return output_array;
+}
+
+/// FIXME: Support ::arrow::LargeListArray.
+Result<std::shared_ptr<::arrow::Array>> ProjectListArray(
+ const std::shared_ptr<::arrow::ListArray>& list_array,
+ const std::shared_ptr<::arrow::ListType>& output_list_type, const
ListType& list_type,
+ std::span<const FieldProjection> projections, ::arrow::MemoryPool* pool) {
+ if (projections.size() != 1) {
+ return InvalidArgument("Expected 1 projection for list, got: {}",
projections.size());
+ }
+
+ const auto& element_field = list_type.fields().back();
+ const auto& element_projection = projections[0];
+ const auto& output_element_type = output_list_type->value_type();
+
+ std::shared_ptr<::arrow::Array> projected_values;
+ if (element_field.type()->is_nested()) {
+ const auto& nested_type =
+ internal::checked_cast<const NestedType&>(*element_field.type());
+ ICEBERG_ASSIGN_OR_RAISE(
+ projected_values,
+ ProjectNestedArray(list_array->values(), output_element_type,
nested_type,
+ element_projection.children, pool));
+ } else {
+ ICEBERG_ASSIGN_OR_RAISE(
+ projected_values,
+ ProjectPrimitiveArray(list_array->values(), output_element_type,
pool));
+ }
+
+ return std::make_shared<::arrow::ListArray>(
+ output_list_type, list_array->length(), list_array->value_offsets(),
+ std::move(projected_values), list_array->null_bitmap(),
list_array->null_count(),
+ list_array->offset());
+}
+
+Result<std::shared_ptr<::arrow::Array>> ProjectMapArray(
+ const std::shared_ptr<::arrow::MapArray>& map_array,
+ const std::shared_ptr<::arrow::MapType>& output_map_type, const MapType&
map_type,
+ std::span<const FieldProjection> projections, ::arrow::MemoryPool* pool) {
+ if (projections.size() != 2) {
+ return InvalidArgument("Expected 2 projections for map, got: {}",
projections.size());
+ }
+
+ const auto& key_projection = projections[0];
+ const auto& value_projection = projections[1];
+ const auto& key_type = map_type.key().type();
+ const auto& value_type = map_type.value().type();
+
+ // Project keys
+ std::shared_ptr<::arrow::Array> projected_keys;
+ if (key_type->is_nested()) {
+ const auto& nested_type = internal::checked_cast<const
NestedType&>(*key_type);
+ ICEBERG_ASSIGN_OR_RAISE(
+ projected_keys, ProjectNestedArray(map_array->keys(),
output_map_type->key_type(),
+ nested_type,
key_projection.children, pool));
+ } else {
+ ICEBERG_ASSIGN_OR_RAISE(
+ projected_keys,
+ ProjectPrimitiveArray(map_array->keys(), output_map_type->key_type(),
pool));
+ }
+
+ // Project values
+ std::shared_ptr<::arrow::Array> projected_items;
+ if (value_type->is_nested()) {
+ const auto& nested_type = internal::checked_cast<const
NestedType&>(*value_type);
+ ICEBERG_ASSIGN_OR_RAISE(
+ projected_items,
+ ProjectNestedArray(map_array->items(), output_map_type->item_type(),
nested_type,
+ value_projection.children, pool));
+ } else {
+ ICEBERG_ASSIGN_OR_RAISE(
+ projected_items,
+ ProjectPrimitiveArray(map_array->items(),
output_map_type->item_type(), pool));
+ }
+
+ return std::make_shared<::arrow::MapArray>(
+ output_map_type, map_array->length(), map_array->value_offsets(),
+ std::move(projected_keys), std::move(projected_items),
map_array->null_bitmap(),
+ map_array->null_count(), map_array->offset());
+}
+
+Result<std::shared_ptr<::arrow::Array>> ProjectNestedArray(
+ const std::shared_ptr<::arrow::Array>& array,
+ const std::shared_ptr<::arrow::DataType>& output_arrow_type,
+ const NestedType& nested_type, std::span<const FieldProjection>
projections,
+ ::arrow::MemoryPool* pool) {
+ switch (nested_type.type_id()) {
+ case TypeId::kStruct: {
+ if (output_arrow_type->id() != ::arrow::Type::STRUCT) {
+ return InvalidSchema("Expected struct type, got: {}",
+ output_arrow_type->ToString());
+ }
+ auto struct_array =
internal::checked_pointer_cast<::arrow::StructArray>(array);
+ auto output_struct_type =
+
internal::checked_pointer_cast<::arrow::StructType>(output_arrow_type);
+ const auto& struct_type = internal::checked_cast<const
StructType&>(nested_type);
+ return ProjectStructArray(struct_array, output_struct_type, struct_type,
+ projections, pool);
+ }
+ case TypeId::kList: {
+ if (output_arrow_type->id() != ::arrow::Type::LIST) {
+ return InvalidSchema("Expected list type, got: {}",
+ output_arrow_type->ToString());
+ }
+
+ auto list_array =
internal::checked_pointer_cast<::arrow::ListArray>(array);
+ auto output_list_type =
+ internal::checked_pointer_cast<::arrow::ListType>(output_arrow_type);
+ const auto& list_type = internal::checked_cast<const
ListType&>(nested_type);
+ return ProjectListArray(list_array, output_list_type, list_type,
projections, pool);
+ }
+ case TypeId::kMap: {
+ if (output_arrow_type->id() != ::arrow::Type::MAP) {
+ return InvalidSchema("Expected map type, got: {}",
output_arrow_type->ToString());
+ }
+
+ auto map_array =
internal::checked_pointer_cast<::arrow::MapArray>(array);
+ auto output_map_type =
+ internal::checked_pointer_cast<::arrow::MapType>(output_arrow_type);
+ const auto& map_type = internal::checked_cast<const
MapType&>(nested_type);
+ return ProjectMapArray(map_array, output_map_type, map_type,
projections, pool);
+ }
+ default:
+ return InvalidSchema("Cannot project array of unsupported nested type:
{}",
+ nested_type.ToString());
+ }
+}
+
+} // namespace
+
Result<std::shared_ptr<::arrow::RecordBatch>> ProjectRecordBatch(
std::shared_ptr<::arrow::RecordBatch> record_batch,
const std::shared_ptr<::arrow::Schema>& output_arrow_schema,
- const Schema& projected_schema, const SchemaProjection& projection) {
- return NotImplemented("NYI");
+ const Schema& projected_schema, const SchemaProjection& projection,
+ ::arrow::MemoryPool* pool) {
+ auto array = std::make_shared<::arrow::StructArray>(
+ ::arrow::struct_(record_batch->schema()->fields()),
record_batch->num_rows(),
+ record_batch->columns());
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto output_array,
+ ProjectNestedArray(array,
::arrow::struct_(output_arrow_schema->fields()),
+ projected_schema, projection.fields, pool));
+ auto* struct_array =
internal::checked_cast<::arrow::StructArray*>(output_array.get());
+ return ::arrow::RecordBatch::Make(output_arrow_schema,
record_batch->num_rows(),
+ struct_array->fields());
}
} // namespace iceberg::parquet
diff --git a/src/iceberg/parquet/parquet_data_util_internal.h
b/src/iceberg/parquet/parquet_data_util_internal.h
index c222d74..4a4f881 100644
--- a/src/iceberg/parquet/parquet_data_util_internal.h
+++ b/src/iceberg/parquet/parquet_data_util_internal.h
@@ -19,12 +19,9 @@
#pragma once
-#include "iceberg/schema_util.h"
+#include <arrow/type_fwd.h>
-namespace arrow {
-class RecordBatch;
-class Schema;
-} // namespace arrow
+#include "iceberg/schema_util.h"
namespace iceberg::parquet {
@@ -34,10 +31,12 @@ namespace iceberg::parquet {
/// \param output_arrow_schema The Arrow schema to convert to.
/// \param projected_schema The projected Iceberg schema.
/// \param projection The projection from projected Iceberg schema to the
record batch.
+/// \param pool The arrow memory pool.
/// \return The converted record batch.
Result<std::shared_ptr<::arrow::RecordBatch>> ProjectRecordBatch(
std::shared_ptr<::arrow::RecordBatch> record_batch,
const std::shared_ptr<::arrow::Schema>& output_arrow_schema,
- const Schema& projected_schema, const SchemaProjection& projection);
+ const Schema& projected_schema, const SchemaProjection& projection,
+ ::arrow::MemoryPool* pool);
} // namespace iceberg::parquet
diff --git a/src/iceberg/parquet/parquet_reader.cc
b/src/iceberg/parquet/parquet_reader.cc
index 6903310..3f15986 100644
--- a/src/iceberg/parquet/parquet_reader.cc
+++ b/src/iceberg/parquet/parquet_reader.cc
@@ -117,11 +117,8 @@ class ParquetReader::Impl {
split_ = options.split;
read_schema_ = options.projection;
- // TODO(gangwu): make memory pool configurable
- ::arrow::MemoryPool* pool = ::arrow::default_memory_pool();
-
// Prepare reader properties
- ::parquet::ReaderProperties reader_properties(pool);
+ ::parquet::ReaderProperties reader_properties(pool_);
::parquet::ArrowReaderProperties arrow_reader_properties;
arrow_reader_properties.set_batch_size(options.batch_size);
arrow_reader_properties.set_arrow_extensions_enabled(true);
@@ -131,7 +128,7 @@ class ParquetReader::Impl {
auto file_reader =
::parquet::ParquetFileReader::Open(std::move(input_stream),
reader_properties);
ICEBERG_ARROW_RETURN_NOT_OK(::parquet::arrow::FileReader::Make(
- pool, std::move(file_reader), arrow_reader_properties, &reader_));
+ pool_, std::move(file_reader), arrow_reader_properties, &reader_));
// Project read schema onto the Parquet file schema
ICEBERG_ASSIGN_OR_RAISE(projection_, BuildProjection(reader_.get(),
*read_schema_));
@@ -152,7 +149,7 @@ class ParquetReader::Impl {
ICEBERG_ASSIGN_OR_RAISE(
batch, ProjectRecordBatch(std::move(batch),
context_->output_arrow_schema_,
- *read_schema_, projection_));
+ *read_schema_, projection_, pool_));
ArrowArray arrow_array;
ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportRecordBatch(*batch,
&arrow_array));
@@ -227,6 +224,8 @@ class ParquetReader::Impl {
}
private:
+ // TODO(gangwu): make memory pool configurable
+ ::arrow::MemoryPool* pool_ = ::arrow::default_memory_pool();
// The split to read from the Parquet file.
std::optional<Split> split_;
// Schema to read from the Parquet file.
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 091fa29..be4afcb 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -117,5 +117,9 @@ if(ICEBERG_BUILD_BUNDLE)
test_common.cc
in_memory_catalog_test.cc)
- add_iceberg_test(parquet_test USE_BUNDLE SOURCES parquet_schema_test.cc)
+ add_iceberg_test(parquet_test
+ USE_BUNDLE
+ SOURCES
+ parquet_data_test.cc
+ parquet_schema_test.cc)
endif()
diff --git a/test/parquet_data_test.cc b/test/parquet_data_test.cc
new file mode 100644
index 0000000..bc8f421
--- /dev/null
+++ b/test/parquet_data_test.cc
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <arrow/array.h>
+#include <arrow/c/bridge.h>
+#include <arrow/json/from_string.h>
+#include <arrow/record_batch.h>
+#include <arrow/type.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/parquet/parquet_data_util_internal.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/schema_util.h"
+#include "iceberg/type.h"
+#include "matchers.h"
+
+namespace iceberg::parquet {
+
+struct ProjectRecordBatchParam {
+ std::string name;
+ std::shared_ptr<Type> projected_type;
+ std::shared_ptr<Type> source_type;
+ std::string input_json;
+ std::string expected_json;
+};
+
+std::shared_ptr<::arrow::RecordBatch> RecordBatchFromJson(
+ const std::shared_ptr<::arrow::Schema>& schema, const std::string&
json_data) {
+ auto struct_type = ::arrow::struct_(schema->fields());
+ auto array = ::arrow::json::ArrayFromJSONString(struct_type,
json_data).ValueOrDie();
+ auto struct_array = std::static_pointer_cast<::arrow::StructArray>(array);
+ return ::arrow::RecordBatch::Make(schema, struct_array->length(),
+ struct_array->fields());
+}
+
+void VerifyProjectRecordBatch(const Schema& projected_schema, const Schema&
source_schema,
+ const std::string& input_json,
+ const std::string& expected_json) {
+ auto schema_projection_result =
+ Project(projected_schema, source_schema, /*prune_source=*/false);
+ ASSERT_THAT(schema_projection_result, IsOk());
+ auto schema_projection = std::move(schema_projection_result.value());
+
+ ArrowSchema source_arrow_c_schema;
+ ASSERT_THAT(ToArrowSchema(source_schema, &source_arrow_c_schema), IsOk());
+ auto source_arrow_schema =
::arrow::ImportSchema(&source_arrow_c_schema).ValueOrDie();
+ auto input_record_batch = RecordBatchFromJson(source_arrow_schema,
input_json);
+
+ ArrowSchema projected_arrow_c_schema;
+ ASSERT_THAT(ToArrowSchema(projected_schema, &projected_arrow_c_schema),
IsOk());
+ auto projected_arrow_schema =
+ ::arrow::ImportSchema(&projected_arrow_c_schema).ValueOrDie();
+
+ auto project_result =
+ ProjectRecordBatch(input_record_batch, projected_arrow_schema,
projected_schema,
+ schema_projection, ::arrow::default_memory_pool());
+ ASSERT_THAT(project_result, IsOk());
+ auto projected_record_batch = std::move(project_result.value());
+
+ auto expected_record_batch = RecordBatchFromJson(projected_arrow_schema,
expected_json);
+ ASSERT_TRUE(projected_record_batch->Equals(*expected_record_batch))
+ << "projected_record_batch: " << projected_record_batch->ToString()
+ << "\nexpected_record_batch: " << expected_record_batch->ToString();
+}
+
+class ProjectRecordBatchTest : public
::testing::TestWithParam<ProjectRecordBatchParam> {
+};
+
+TEST_P(ProjectRecordBatchTest, PrimitiveType) {
+ const auto& test_case = GetParam();
+
+ Schema projected_schema({SchemaField::MakeRequired(
+ /*field_id=*/1, /*name=*/"a", test_case.projected_type)});
+ Schema source_schema({SchemaField::MakeRequired(
+ /*field_id=*/1, /*name=*/"a", test_case.source_type)});
+
+ ASSERT_NO_FATAL_FAILURE(VerifyProjectRecordBatch(
+ projected_schema, source_schema, test_case.input_json,
test_case.expected_json));
+}
+
+const std::vector<ProjectRecordBatchParam> kPrimitiveTestCases = {
+ {
+ .name = "Boolean",
+ .projected_type = boolean(),
+ .source_type = boolean(),
+ .input_json = R"([{"a": true}, {"a": false}, {"a": true}])",
+ .expected_json = R"([{"a": true}, {"a": false}, {"a": true}])",
+ },
+ {
+ .name = "Int",
+ .projected_type = int32(),
+ .source_type = int32(),
+ .input_json = R"([{"a": 0}, {"a": 100}, {"a": 200}])",
+ .expected_json = R"([{"a": 0}, {"a": 100}, {"a": 200}])",
+ },
+ {
+ .name = "Long",
+ .projected_type = int64(),
+ .source_type = int64(),
+ .input_json = R"([{"a": 0}, {"a": 1000000}, {"a": 2000000}])",
+ .expected_json = R"([{"a": 0}, {"a": 1000000}, {"a": 2000000}])",
+ },
+ {
+ .name = "Float",
+ .projected_type = float32(),
+ .source_type = float32(),
+ .input_json = R"([{"a": 0.0}, {"a": 3.14}, {"a": 6.28}])",
+ .expected_json = R"([{"a": 0.0}, {"a": 3.14}, {"a": 6.28}])",
+ },
+ {
+ .name = "Double",
+ .projected_type = float64(),
+ .source_type = float64(),
+ .input_json = R"([{"a": 0.0}, {"a": 1.234567890}, {"a":
2.469135780}])",
+ .expected_json = R"([{"a": 0.0}, {"a": 1.234567890}, {"a":
2.469135780}])",
+ },
+ {
+ .name = "String",
+ .projected_type = string(),
+ .source_type = string(),
+ .input_json =
+ R"([{"a": "test_string_0"}, {"a": "test_string_1"}, {"a":
"test_string_2"}])",
+ .expected_json =
+ R"([{"a": "test_string_0"}, {"a": "test_string_1"}, {"a":
"test_string_2"}])",
+ },
+ {
+ .name = "Binary",
+ .projected_type = binary(),
+ .source_type = binary(),
+ .input_json = R"([{"a": "abc"}, {"a": "bcd"}, {"a": "cde"}])",
+ .expected_json = R"([{"a": "abc"}, {"a": "bcd"}, {"a": "cde"}])",
+ },
+ {
+ .name = "Fixed",
+ .projected_type = fixed(4),
+ .source_type = fixed(4),
+ .input_json = R"([{"a": "abcd"}, {"a": "bcde"}, {"a": "cdef"}])",
+ .expected_json = R"([{"a": "abcd"}, {"a": "bcde"}, {"a": "cdef"}])",
+ },
+ {
+ .name = "Decimal",
+ .projected_type = decimal(10, 2),
+ .source_type = decimal(10, 2),
+ .input_json = R"([{"a": "0.00"}, {"a": "10.01"}, {"a": "20.02"}])",
+ .expected_json = R"([{"a": "0.00"}, {"a": "10.01"}, {"a": "20.02"}])",
+ },
+ {
+ .name = "Date",
+ .projected_type = date(),
+ .source_type = date(),
+ .input_json = R"([{"a": 18000}, {"a": 18001}, {"a": 18002}])",
+ .expected_json = R"([{"a": 18000}, {"a": 18001}, {"a": 18002}])",
+ },
+ {
+ .name = "Time",
+ .projected_type = time(),
+ .source_type = time(),
+ .input_json = R"([{"a": 45045123456}, {"a": 45046123456}, {"a":
45047123456}])",
+ .expected_json =
+ R"([{"a": 45045123456}, {"a": 45046123456}, {"a": 45047123456}])",
+ },
+ {
+ .name = "Timestamp",
+ .projected_type = timestamp(),
+ .source_type = timestamp(),
+ .input_json = R"([{"a": 0}, {"a": 1000000}, {"a": 2000000}])",
+ .expected_json = R"([{"a": 0}, {"a": 1000000}, {"a": 2000000}])",
+ },
+ {
+ .name = "TimestampTz",
+ .projected_type = timestamp_tz(),
+ .source_type = timestamp_tz(),
+ .input_json =
+ R"([{"a": 1672531200000000}, {"a": 1672531201000000}, {"a":
1672531202000000}])",
+ .expected_json =
+ R"([{"a": 1672531200000000}, {"a": 1672531201000000}, {"a":
1672531202000000}])",
+ },
+ {
+ .name = "IntToLongPromotion",
+ .projected_type = int64(),
+ .source_type = int32(),
+ .input_json = R"([{"a": 0}, {"a": 100}, {"a": 200}])",
+ .expected_json = R"([{"a": 0}, {"a": 100}, {"a": 200}])",
+ },
+ {
+ .name = "FloatToDoublePromotion",
+ .projected_type = float64(),
+ .source_type = float32(),
+ .input_json = R"([{"a": 0.0}, {"a": 1.0}, {"a": 2.0}])",
+ .expected_json = R"([{"a": 0.0}, {"a": 1.0}, {"a": 2.0}])",
+ },
+ {
+ .name = "DecimalPrecisionPromotion",
+ .projected_type = decimal(10, 2),
+ .source_type = decimal(6, 2),
+ .input_json = R"([{"a": "0.00"}, {"a": "10.01"}, {"a": "20.02"}])",
+ .expected_json = R"([{"a": "0.00"}, {"a": "10.01"}, {"a": "20.02"}])",
+ },
+};
+
+INSTANTIATE_TEST_SUITE_P(
+ AllPrimitiveTypes, ProjectRecordBatchTest,
::testing::ValuesIn(kPrimitiveTestCases),
+ [](const ::testing::TestParamInfo<ProjectRecordBatchParam>& info) {
+ return info.param.name;
+ });
+
+TEST(ProjectRecordBatchTest, StructWithTwoFields) {
+ Schema iceberg_schema({
+ SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeRequired(2, "name", string()),
+ });
+
+ const std::string input_json = R"([{"id": 42, "name": "test"}])";
+ const std::string expected_json = R"([{"id": 42, "name": "test"}])";
+
+ ASSERT_NO_FATAL_FAILURE(VerifyProjectRecordBatch(iceberg_schema,
iceberg_schema,
+ input_json, expected_json));
+}
+
+TEST(ProjectRecordBatchTest, NestedStruct) {
+ Schema iceberg_schema({
+ SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeRequired(2, "person",
+
std::make_shared<StructType>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(3, "name",
string()),
+ SchemaField::MakeRequired(4, "age",
int32()),
+ })),
+ });
+
+ const std::string input_json = R"([
+ {"id": 1, "person": {"name": "Person0", "age": 25}},
+ {"id": 2, "person": {"name": "Person1", "age": 26}}
+ ])";
+
+ ASSERT_NO_FATAL_FAILURE(
+ VerifyProjectRecordBatch(iceberg_schema, iceberg_schema, input_json,
input_json));
+}
+
+TEST(ProjectRecordBatchTest, ListOfIntegers) {
+ Schema iceberg_schema({
+ SchemaField::MakeRequired(
+ 1, "numbers",
+ std::make_shared<ListType>(SchemaField::MakeRequired(2, "element",
int32()))),
+ });
+
+ const std::string input_json = R"([
+ {"numbers": [0, 1, 2]},
+ {"numbers": [10, 11, 12]}
+ ])";
+
+ ASSERT_NO_FATAL_FAILURE(
+ VerifyProjectRecordBatch(iceberg_schema, iceberg_schema, input_json,
input_json));
+}
+
+TEST(ProjectRecordBatchTest, ListOfStructs) {
+ Schema iceberg_schema({
+ SchemaField::MakeRequired(1, "people",
+
std::make_shared<ListType>(SchemaField::MakeRequired(
+ 2, "element",
+
std::make_shared<StructType>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(3, "name",
string()),
+ SchemaField::MakeRequired(4, "age",
int32()),
+ })))),
+ });
+
+ const std::string input_json = R"([
+ {"people": [
+ {"name": "Person0_0", "age": 20},
+ {"name": "Person0_1", "age": 21}
+ ]},
+ {"people": [
+ {"name": "Person1_0", "age": 30},
+ {"name": "Person1_1", "age": 31}
+ ]}
+ ])";
+
+ ASSERT_NO_FATAL_FAILURE(
+ VerifyProjectRecordBatch(iceberg_schema, iceberg_schema, input_json,
input_json));
+}
+
+TEST(ProjectRecordBatchTest, MapStringToInt) {
+ Schema iceberg_schema({
+ SchemaField::MakeRequired(
+ 1, "scores",
+ std::make_shared<MapType>(SchemaField::MakeRequired(2, "key",
string()),
+ SchemaField::MakeRequired(3, "value",
int32()))),
+ });
+
+ const std::string input_json = R"([
+ {"scores": [["score_0", 100], ["score_1", 105]]},
+ {"scores": [["score_2", 110], ["score_3", 115]]}
+ ])";
+
+ ASSERT_NO_FATAL_FAILURE(
+ VerifyProjectRecordBatch(iceberg_schema, iceberg_schema, input_json,
input_json));
+}
+
+TEST(ProjectRecordBatchTest, MapStringToStruct) {
+ Schema iceberg_schema({
+ SchemaField::MakeRequired(
+ 1, "users",
+ std::make_shared<MapType>(
+ SchemaField::MakeRequired(2, "key", string()),
+ SchemaField::MakeRequired(
+ 3, "value",
+ std::make_shared<StructType>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(4, "id", int32()),
+ SchemaField::MakeRequired(5, "email", string()),
+ })))),
+ });
+
+ const std::string input_json = R"([
+ {"users": [["user_0", {"id": 1000, "email": "[email protected]"}]]},
+ {"users": [["user_1", {"id": 1001, "email": "[email protected]"}]]}
+ ])";
+
+ ASSERT_NO_FATAL_FAILURE(
+ VerifyProjectRecordBatch(iceberg_schema, iceberg_schema, input_json,
input_json));
+}
+
+TEST(ProjectRecordBatchTest, StructWithMissingOptionalField) {
+ Schema projected_schema({
+ SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeRequired(2, "name", string()),
+ SchemaField::MakeOptional(3, "age", int32()), // Missing in source
+ SchemaField::MakeOptional(4, "email", string()), // Missing in source
+ });
+
+ Schema source_schema({
+ SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeRequired(2, "name", string()),
+ });
+
+ const std::string input_json = R"([
+ {"id": 1, "name": "Person0"},
+ {"id": 2, "name": "Person1"}
+ ])";
+ const std::string expected_json = R"([
+ {"id": 1, "name": "Person0", "age": null, "email": null},
+ {"id": 2, "name": "Person1", "age": null, "email": null}
+ ])";
+
+ ASSERT_NO_FATAL_FAILURE(VerifyProjectRecordBatch(projected_schema,
source_schema,
+ input_json, expected_json));
+}
+
+TEST(ProjectRecordBatchTest, NestedStructWithMissingOptionalFields) {
+ Schema projected_schema({
+ SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeRequired(
+ 2, "person",
+ std::make_shared<StructType>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(3, "name", string()),
+ SchemaField::MakeOptional(4, "age", int32()), // Missing
+ SchemaField::MakeOptional(5, "phone", string()), // Missing
+ })),
+ SchemaField::MakeOptional(6, "department", string()), // Missing
+ });
+
+ Schema source_schema({
+ SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeRequired(2, "person",
+
std::make_shared<StructType>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(3, "name",
string()),
+ })),
+ });
+
+ const std::string input_json = R"([
+ {"id": 100, "person": {"name": "Employee0"}},
+ {"id": 101, "person": {"name": "Employee1"}}
+ ])";
+ const std::string expected_json = R"([
+ {"id": 100, "person": {"name": "Employee0", "age": null, "phone": null},
"department": null},
+ {"id": 101, "person": {"name": "Employee1", "age": null, "phone": null},
"department": null}
+ ])";
+
+ ASSERT_NO_FATAL_FAILURE(VerifyProjectRecordBatch(projected_schema,
source_schema,
+ input_json, expected_json));
+}
+
+TEST(ProjectRecordBatchTest, ListWithMissingOptionalElementFields) {
+ Schema projected_schema({
+ SchemaField::MakeRequired(
+ 1, "people",
+ std::make_shared<ListType>(SchemaField::MakeRequired(
+ 2, "element",
+ std::make_shared<StructType>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(3, "name", string()),
+ SchemaField::MakeOptional(4, "age", int32()), // Missing
in source
+ SchemaField::MakeOptional(5, "email", string()), // Missing
in source
+ })))),
+ });
+
+ Schema source_schema({
+ SchemaField::MakeRequired(1, "people",
+
std::make_shared<ListType>(SchemaField::MakeRequired(
+ 2, "element",
+
std::make_shared<StructType>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(3, "name",
string()),
+ })))),
+ });
+
+ const std::string input_json = R"([
+ {"people": [
+ {"name": "Person0_0"},
+ {"name": "Person0_1"}
+ ]},
+ {"people": [
+ {"name": "Person1_0"},
+ {"name": "Person1_1"}
+ ]}
+ ])";
+ const std::string expected_json = R"([
+ {"people": [
+ {"name": "Person0_0", "age": null, "email": null},
+ {"name": "Person0_1", "age": null, "email": null}
+ ]},
+ {"people": [
+ {"name": "Person1_0", "age": null, "email": null},
+ {"name": "Person1_1", "age": null, "email": null}
+ ]}
+ ])";
+
+ ASSERT_NO_FATAL_FAILURE(VerifyProjectRecordBatch(projected_schema,
source_schema,
+ input_json, expected_json));
+}
+
+TEST(ProjectRecordBatchTest, FieldReordering) {
+ Schema projected_schema({
+ SchemaField::MakeRequired(2, "name", string()),
+ SchemaField::MakeRequired(1, "id", int32()),
+ });
+
+ Schema source_schema({
+ SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeRequired(2, "name", string()),
+ });
+
+ const std::string input_json = R"([
+ {"id": 1, "name": "Alice"},
+ {"id": 2, "name": "Bob"}
+ ])";
+ const std::string expected_json = R"([
+ {"name": "Alice", "id": 1},
+ {"name": "Bob", "id": 2}
+ ])";
+
+ ASSERT_NO_FATAL_FAILURE(VerifyProjectRecordBatch(projected_schema,
source_schema,
+ input_json, expected_json));
+}
+
+TEST(ProjectRecordBatchTest, FieldSubset) {
+ Schema projected_schema({
+ SchemaField::MakeRequired(2, "name", string()),
+ });
+
+ Schema source_schema({
+ SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeRequired(2, "name", string()),
+ SchemaField::MakeRequired(3, "age", int32()),
+ });
+
+ const std::string input_json = R"([
+ {"id": 1, "name": "Alice", "age": 25},
+ {"id": 2, "name": "Bob", "age": 30}
+ ])";
+ const std::string expected_json = R"([
+ {"name": "Alice"},
+ {"name": "Bob"}
+ ])";
+
+ ASSERT_NO_FATAL_FAILURE(VerifyProjectRecordBatch(projected_schema,
source_schema,
+ input_json, expected_json));
+}
+
+TEST(ProjectRecordBatchTest, EmptyRecordBatch) {
+ Schema iceberg_schema({
+ SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeRequired(2, "name", string()),
+ });
+
+ const std::string input_json = R"([])";
+
+ ASSERT_NO_FATAL_FAILURE(
+ VerifyProjectRecordBatch(iceberg_schema, iceberg_schema, input_json,
input_json));
+}
+
+} // namespace iceberg::parquet