This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 687ca03  feat(parquet): project arrow array for parquet (#165)
687ca03 is described below

commit 687ca032068a3232cc00699ec76cb49f74393354
Author: Gang Wu <[email protected]>
AuthorDate: Tue Aug 19 00:40:05 2025 +0800

    feat(parquet): project arrow array for parquet (#165)
---
 src/iceberg/parquet/parquet_data_util.cc         | 255 +++++++++++-
 src/iceberg/parquet/parquet_data_util_internal.h |  11 +-
 src/iceberg/parquet/parquet_reader.cc            |  11 +-
 test/CMakeLists.txt                              |   6 +-
 test/parquet_data_test.cc                        | 506 +++++++++++++++++++++++
 5 files changed, 774 insertions(+), 15 deletions(-)

diff --git a/src/iceberg/parquet/parquet_data_util.cc 
b/src/iceberg/parquet/parquet_data_util.cc
index 6237b00..c822daa 100644
--- a/src/iceberg/parquet/parquet_data_util.cc
+++ b/src/iceberg/parquet/parquet_data_util.cc
@@ -17,15 +17,266 @@
  * under the License.
  */
 
+#include <arrow/array.h>
+#include <arrow/array/builder_primitive.h>
+#include <arrow/compute/api.h>
+#include <arrow/record_batch.h>
+#include <arrow/type.h>
+
+#include "iceberg/arrow/arrow_error_transform_internal.h"
 #include "iceberg/parquet/parquet_data_util_internal.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_util.h"
+#include "iceberg/type.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
 
 namespace iceberg::parquet {
 
+namespace {
+
+// Forward declaration
+Result<std::shared_ptr<::arrow::Array>> ProjectNestedArray(
+    const std::shared_ptr<::arrow::Array>& array,
+    const std::shared_ptr<::arrow::DataType>& output_arrow_type,
+    const NestedType& nested_type, std::span<const FieldProjection> 
projections,
+    ::arrow::MemoryPool* pool);
+
+/// \brief Create a null array of the given type and length.
+Result<std::shared_ptr<::arrow::Array>> MakeNullArray(
+    const std::shared_ptr<::arrow::DataType>& type, int64_t length,
+    ::arrow::MemoryPool* pool) {
+  ICEBERG_ARROW_ASSIGN_OR_RETURN(auto builder, ::arrow::MakeBuilder(type, 
pool));
+  ICEBERG_ARROW_RETURN_NOT_OK(builder->AppendNulls(length));
+  ICEBERG_ARROW_ASSIGN_OR_RETURN(auto array, builder->Finish());
+  return array;
+}
+
+Result<std::shared_ptr<::arrow::Array>> ProjectPrimitiveArray(
+    const std::shared_ptr<::arrow::Array>& array,
+    const std::shared_ptr<::arrow::DataType>& output_arrow_type,
+    ::arrow::MemoryPool* pool) {
+  if (array->type()->Equals(output_arrow_type)) {
+    return array;
+  }
+
+  // Use Arrow compute cast function for type conversions.
+  // Note: We don't check the schema evolution rule again because projecting 
schemas
+  // has checked this.
+  ICEBERG_ARROW_ASSIGN_OR_RETURN(auto cast_result,
+                                 ::arrow::compute::Cast(array, 
output_arrow_type));
+  return cast_result.make_array();
+}
+
+Result<std::shared_ptr<::arrow::Array>> ProjectStructArray(
+    const std::shared_ptr<::arrow::StructArray>& struct_array,
+    const std::shared_ptr<::arrow::StructType>& output_struct_type,
+    const StructType& struct_type, std::span<const FieldProjection> 
projections,
+    ::arrow::MemoryPool* pool) {
+  if (struct_type.fields().size() != projections.size()) {
+    return InvalidSchema(
+        "Inconsistent number of fields ({}) and number of projections ({})",
+        struct_type.fields().size(), projections.size());
+  }
+  if (struct_type.fields().size() != output_struct_type->num_fields()) {
+    return InvalidSchema(
+        "Inconsistent number of fields ({}) and number of output fields ({})",
+        struct_type.fields().size(), output_struct_type->num_fields());
+  }
+
+  std::vector<std::shared_ptr<::arrow::Array>> projected_arrays;
+  projected_arrays.reserve(projections.size());
+
+  for (size_t i = 0; i < projections.size(); ++i) {
+    const auto& projected_field = struct_type.fields()[i];
+    const auto& field_projection = projections[i];
+    const auto& output_arrow_type = output_struct_type->fields()[i]->type();
+
+    std::shared_ptr<::arrow::Array> projected_array;
+
+    if (field_projection.kind == FieldProjection::Kind::kProjected) {
+      auto parquet_field_index =
+          static_cast<int>(std::get<size_t>(field_projection.from));
+      if (parquet_field_index >= struct_array->num_fields()) {
+        return InvalidArgument("Parquet field index {} out of bound {}",
+                               parquet_field_index, 
struct_array->num_fields());
+      }
+      const auto& parquet_array = struct_array->field(parquet_field_index);
+      if (projected_field.type()->is_nested()) {
+        const auto& nested_type =
+            internal::checked_cast<const NestedType&>(*projected_field.type());
+        ICEBERG_ASSIGN_OR_RAISE(
+            projected_array,
+            ProjectNestedArray(parquet_array, output_arrow_type, nested_type,
+                               field_projection.children, pool));
+      } else {
+        ICEBERG_ASSIGN_OR_RAISE(
+            projected_array,
+            ProjectPrimitiveArray(parquet_array, output_arrow_type, pool));
+      }
+    } else if (field_projection.kind == FieldProjection::Kind::kNull) {
+      ICEBERG_ASSIGN_OR_RAISE(
+          projected_array,
+          MakeNullArray(output_arrow_type, struct_array->length(), pool));
+    } else {
+      return NotImplemented("Unsupported field projection kind: {}",
+                            ToString(field_projection.kind));
+    }
+
+    projected_arrays.emplace_back(std::move(projected_array));
+  }
+
+  ICEBERG_ARROW_ASSIGN_OR_RETURN(
+      auto output_array,
+      ::arrow::StructArray::Make(projected_arrays, 
output_struct_type->fields(),
+                                 struct_array->null_bitmap(), 
struct_array->null_count(),
+                                 struct_array->offset()));
+  return output_array;
+}
+
+/// FIXME: Support ::arrow::LargeListArray.
+Result<std::shared_ptr<::arrow::Array>> ProjectListArray(
+    const std::shared_ptr<::arrow::ListArray>& list_array,
+    const std::shared_ptr<::arrow::ListType>& output_list_type, const 
ListType& list_type,
+    std::span<const FieldProjection> projections, ::arrow::MemoryPool* pool) {
+  if (projections.size() != 1) {
+    return InvalidArgument("Expected 1 projection for list, got: {}", 
projections.size());
+  }
+
+  const auto& element_field = list_type.fields().back();
+  const auto& element_projection = projections[0];
+  const auto& output_element_type = output_list_type->value_type();
+
+  std::shared_ptr<::arrow::Array> projected_values;
+  if (element_field.type()->is_nested()) {
+    const auto& nested_type =
+        internal::checked_cast<const NestedType&>(*element_field.type());
+    ICEBERG_ASSIGN_OR_RAISE(
+        projected_values,
+        ProjectNestedArray(list_array->values(), output_element_type, 
nested_type,
+                           element_projection.children, pool));
+  } else {
+    ICEBERG_ASSIGN_OR_RAISE(
+        projected_values,
+        ProjectPrimitiveArray(list_array->values(), output_element_type, 
pool));
+  }
+
+  return std::make_shared<::arrow::ListArray>(
+      output_list_type, list_array->length(), list_array->value_offsets(),
+      std::move(projected_values), list_array->null_bitmap(), 
list_array->null_count(),
+      list_array->offset());
+}
+
+Result<std::shared_ptr<::arrow::Array>> ProjectMapArray(
+    const std::shared_ptr<::arrow::MapArray>& map_array,
+    const std::shared_ptr<::arrow::MapType>& output_map_type, const MapType& 
map_type,
+    std::span<const FieldProjection> projections, ::arrow::MemoryPool* pool) {
+  if (projections.size() != 2) {
+    return InvalidArgument("Expected 2 projections for map, got: {}", 
projections.size());
+  }
+
+  const auto& key_projection = projections[0];
+  const auto& value_projection = projections[1];
+  const auto& key_type = map_type.key().type();
+  const auto& value_type = map_type.value().type();
+
+  // Project keys
+  std::shared_ptr<::arrow::Array> projected_keys;
+  if (key_type->is_nested()) {
+    const auto& nested_type = internal::checked_cast<const 
NestedType&>(*key_type);
+    ICEBERG_ASSIGN_OR_RAISE(
+        projected_keys, ProjectNestedArray(map_array->keys(), 
output_map_type->key_type(),
+                                           nested_type, 
key_projection.children, pool));
+  } else {
+    ICEBERG_ASSIGN_OR_RAISE(
+        projected_keys,
+        ProjectPrimitiveArray(map_array->keys(), output_map_type->key_type(), 
pool));
+  }
+
+  // Project values
+  std::shared_ptr<::arrow::Array> projected_items;
+  if (value_type->is_nested()) {
+    const auto& nested_type = internal::checked_cast<const 
NestedType&>(*value_type);
+    ICEBERG_ASSIGN_OR_RAISE(
+        projected_items,
+        ProjectNestedArray(map_array->items(), output_map_type->item_type(), 
nested_type,
+                           value_projection.children, pool));
+  } else {
+    ICEBERG_ASSIGN_OR_RAISE(
+        projected_items,
+        ProjectPrimitiveArray(map_array->items(), 
output_map_type->item_type(), pool));
+  }
+
+  return std::make_shared<::arrow::MapArray>(
+      output_map_type, map_array->length(), map_array->value_offsets(),
+      std::move(projected_keys), std::move(projected_items), 
map_array->null_bitmap(),
+      map_array->null_count(), map_array->offset());
+}
+
+Result<std::shared_ptr<::arrow::Array>> ProjectNestedArray(
+    const std::shared_ptr<::arrow::Array>& array,
+    const std::shared_ptr<::arrow::DataType>& output_arrow_type,
+    const NestedType& nested_type, std::span<const FieldProjection> 
projections,
+    ::arrow::MemoryPool* pool) {
+  switch (nested_type.type_id()) {
+    case TypeId::kStruct: {
+      if (output_arrow_type->id() != ::arrow::Type::STRUCT) {
+        return InvalidSchema("Expected struct type, got: {}",
+                             output_arrow_type->ToString());
+      }
+      auto struct_array = 
internal::checked_pointer_cast<::arrow::StructArray>(array);
+      auto output_struct_type =
+          
internal::checked_pointer_cast<::arrow::StructType>(output_arrow_type);
+      const auto& struct_type = internal::checked_cast<const 
StructType&>(nested_type);
+      return ProjectStructArray(struct_array, output_struct_type, struct_type,
+                                projections, pool);
+    }
+    case TypeId::kList: {
+      if (output_arrow_type->id() != ::arrow::Type::LIST) {
+        return InvalidSchema("Expected list type, got: {}",
+                             output_arrow_type->ToString());
+      }
+
+      auto list_array = 
internal::checked_pointer_cast<::arrow::ListArray>(array);
+      auto output_list_type =
+          internal::checked_pointer_cast<::arrow::ListType>(output_arrow_type);
+      const auto& list_type = internal::checked_cast<const 
ListType&>(nested_type);
+      return ProjectListArray(list_array, output_list_type, list_type, 
projections, pool);
+    }
+    case TypeId::kMap: {
+      if (output_arrow_type->id() != ::arrow::Type::MAP) {
+        return InvalidSchema("Expected map type, got: {}", 
output_arrow_type->ToString());
+      }
+
+      auto map_array = 
internal::checked_pointer_cast<::arrow::MapArray>(array);
+      auto output_map_type =
+          internal::checked_pointer_cast<::arrow::MapType>(output_arrow_type);
+      const auto& map_type = internal::checked_cast<const 
MapType&>(nested_type);
+      return ProjectMapArray(map_array, output_map_type, map_type, 
projections, pool);
+    }
+    default:
+      return InvalidSchema("Cannot project array of unsupported nested type: 
{}",
+                           nested_type.ToString());
+  }
+}
+
+}  // namespace
+
 Result<std::shared_ptr<::arrow::RecordBatch>> ProjectRecordBatch(
     std::shared_ptr<::arrow::RecordBatch> record_batch,
     const std::shared_ptr<::arrow::Schema>& output_arrow_schema,
-    const Schema& projected_schema, const SchemaProjection& projection) {
-  return NotImplemented("NYI");
+    const Schema& projected_schema, const SchemaProjection& projection,
+    ::arrow::MemoryPool* pool) {
+  auto array = std::make_shared<::arrow::StructArray>(
+      ::arrow::struct_(record_batch->schema()->fields()), 
record_batch->num_rows(),
+      record_batch->columns());
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto output_array,
+      ProjectNestedArray(array, 
::arrow::struct_(output_arrow_schema->fields()),
+                         projected_schema, projection.fields, pool));
+  auto* struct_array = 
internal::checked_cast<::arrow::StructArray*>(output_array.get());
+  return ::arrow::RecordBatch::Make(output_arrow_schema, 
record_batch->num_rows(),
+                                    struct_array->fields());
 }
 
 }  // namespace iceberg::parquet
diff --git a/src/iceberg/parquet/parquet_data_util_internal.h 
b/src/iceberg/parquet/parquet_data_util_internal.h
index c222d74..4a4f881 100644
--- a/src/iceberg/parquet/parquet_data_util_internal.h
+++ b/src/iceberg/parquet/parquet_data_util_internal.h
@@ -19,12 +19,9 @@
 
 #pragma once
 
-#include "iceberg/schema_util.h"
+#include <arrow/type_fwd.h>
 
-namespace arrow {
-class RecordBatch;
-class Schema;
-}  // namespace arrow
+#include "iceberg/schema_util.h"
 
 namespace iceberg::parquet {
 
@@ -34,10 +31,12 @@ namespace iceberg::parquet {
 /// \param output_arrow_schema The Arrow schema to convert to.
 /// \param projected_schema The projected Iceberg schema.
 /// \param projection The projection from projected Iceberg schema to the 
record batch.
+/// \param pool The arrow memory pool.
 /// \return The converted record batch.
 Result<std::shared_ptr<::arrow::RecordBatch>> ProjectRecordBatch(
     std::shared_ptr<::arrow::RecordBatch> record_batch,
     const std::shared_ptr<::arrow::Schema>& output_arrow_schema,
-    const Schema& projected_schema, const SchemaProjection& projection);
+    const Schema& projected_schema, const SchemaProjection& projection,
+    ::arrow::MemoryPool* pool);
 
 }  // namespace iceberg::parquet
diff --git a/src/iceberg/parquet/parquet_reader.cc 
b/src/iceberg/parquet/parquet_reader.cc
index 6903310..3f15986 100644
--- a/src/iceberg/parquet/parquet_reader.cc
+++ b/src/iceberg/parquet/parquet_reader.cc
@@ -117,11 +117,8 @@ class ParquetReader::Impl {
     split_ = options.split;
     read_schema_ = options.projection;
 
-    // TODO(gangwu): make memory pool configurable
-    ::arrow::MemoryPool* pool = ::arrow::default_memory_pool();
-
     // Prepare reader properties
-    ::parquet::ReaderProperties reader_properties(pool);
+    ::parquet::ReaderProperties reader_properties(pool_);
     ::parquet::ArrowReaderProperties arrow_reader_properties;
     arrow_reader_properties.set_batch_size(options.batch_size);
     arrow_reader_properties.set_arrow_extensions_enabled(true);
@@ -131,7 +128,7 @@ class ParquetReader::Impl {
     auto file_reader =
         ::parquet::ParquetFileReader::Open(std::move(input_stream), 
reader_properties);
     ICEBERG_ARROW_RETURN_NOT_OK(::parquet::arrow::FileReader::Make(
-        pool, std::move(file_reader), arrow_reader_properties, &reader_));
+        pool_, std::move(file_reader), arrow_reader_properties, &reader_));
 
     // Project read schema onto the Parquet file schema
     ICEBERG_ASSIGN_OR_RAISE(projection_, BuildProjection(reader_.get(), 
*read_schema_));
@@ -152,7 +149,7 @@ class ParquetReader::Impl {
 
     ICEBERG_ASSIGN_OR_RAISE(
         batch, ProjectRecordBatch(std::move(batch), 
context_->output_arrow_schema_,
-                                  *read_schema_, projection_));
+                                  *read_schema_, projection_, pool_));
 
     ArrowArray arrow_array;
     ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportRecordBatch(*batch, 
&arrow_array));
@@ -227,6 +224,8 @@ class ParquetReader::Impl {
   }
 
  private:
+  // TODO(gangwu): make memory pool configurable
+  ::arrow::MemoryPool* pool_ = ::arrow::default_memory_pool();
   // The split to read from the Parquet file.
   std::optional<Split> split_;
   // Schema to read from the Parquet file.
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 091fa29..be4afcb 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -117,5 +117,9 @@ if(ICEBERG_BUILD_BUNDLE)
                    test_common.cc
                    in_memory_catalog_test.cc)
 
-  add_iceberg_test(parquet_test USE_BUNDLE SOURCES parquet_schema_test.cc)
+  add_iceberg_test(parquet_test
+                   USE_BUNDLE
+                   SOURCES
+                   parquet_data_test.cc
+                   parquet_schema_test.cc)
 endif()
diff --git a/test/parquet_data_test.cc b/test/parquet_data_test.cc
new file mode 100644
index 0000000..bc8f421
--- /dev/null
+++ b/test/parquet_data_test.cc
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <arrow/array.h>
+#include <arrow/c/bridge.h>
+#include <arrow/json/from_string.h>
+#include <arrow/record_batch.h>
+#include <arrow/type.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/parquet/parquet_data_util_internal.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/schema_util.h"
+#include "iceberg/type.h"
+#include "matchers.h"
+
+namespace iceberg::parquet {
+
+struct ProjectRecordBatchParam {
+  std::string name;
+  std::shared_ptr<Type> projected_type;
+  std::shared_ptr<Type> source_type;
+  std::string input_json;
+  std::string expected_json;
+};
+
+std::shared_ptr<::arrow::RecordBatch> RecordBatchFromJson(
+    const std::shared_ptr<::arrow::Schema>& schema, const std::string& 
json_data) {
+  auto struct_type = ::arrow::struct_(schema->fields());
+  auto array = ::arrow::json::ArrayFromJSONString(struct_type, 
json_data).ValueOrDie();
+  auto struct_array = std::static_pointer_cast<::arrow::StructArray>(array);
+  return ::arrow::RecordBatch::Make(schema, struct_array->length(),
+                                    struct_array->fields());
+}
+
+void VerifyProjectRecordBatch(const Schema& projected_schema, const Schema& 
source_schema,
+                              const std::string& input_json,
+                              const std::string& expected_json) {
+  auto schema_projection_result =
+      Project(projected_schema, source_schema, /*prune_source=*/false);
+  ASSERT_THAT(schema_projection_result, IsOk());
+  auto schema_projection = std::move(schema_projection_result.value());
+
+  ArrowSchema source_arrow_c_schema;
+  ASSERT_THAT(ToArrowSchema(source_schema, &source_arrow_c_schema), IsOk());
+  auto source_arrow_schema = 
::arrow::ImportSchema(&source_arrow_c_schema).ValueOrDie();
+  auto input_record_batch = RecordBatchFromJson(source_arrow_schema, 
input_json);
+
+  ArrowSchema projected_arrow_c_schema;
+  ASSERT_THAT(ToArrowSchema(projected_schema, &projected_arrow_c_schema), 
IsOk());
+  auto projected_arrow_schema =
+      ::arrow::ImportSchema(&projected_arrow_c_schema).ValueOrDie();
+
+  auto project_result =
+      ProjectRecordBatch(input_record_batch, projected_arrow_schema, 
projected_schema,
+                         schema_projection, ::arrow::default_memory_pool());
+  ASSERT_THAT(project_result, IsOk());
+  auto projected_record_batch = std::move(project_result.value());
+
+  auto expected_record_batch = RecordBatchFromJson(projected_arrow_schema, 
expected_json);
+  ASSERT_TRUE(projected_record_batch->Equals(*expected_record_batch))
+      << "projected_record_batch: " << projected_record_batch->ToString()
+      << "\nexpected_record_batch: " << expected_record_batch->ToString();
+}
+
+class ProjectRecordBatchTest : public 
::testing::TestWithParam<ProjectRecordBatchParam> {
+};
+
+TEST_P(ProjectRecordBatchTest, PrimitiveType) {
+  const auto& test_case = GetParam();
+
+  Schema projected_schema({SchemaField::MakeRequired(
+      /*field_id=*/1, /*name=*/"a", test_case.projected_type)});
+  Schema source_schema({SchemaField::MakeRequired(
+      /*field_id=*/1, /*name=*/"a", test_case.source_type)});
+
+  ASSERT_NO_FATAL_FAILURE(VerifyProjectRecordBatch(
+      projected_schema, source_schema, test_case.input_json, 
test_case.expected_json));
+}
+
+const std::vector<ProjectRecordBatchParam> kPrimitiveTestCases = {
+    {
+        .name = "Boolean",
+        .projected_type = boolean(),
+        .source_type = boolean(),
+        .input_json = R"([{"a": true}, {"a": false}, {"a": true}])",
+        .expected_json = R"([{"a": true}, {"a": false}, {"a": true}])",
+    },
+    {
+        .name = "Int",
+        .projected_type = int32(),
+        .source_type = int32(),
+        .input_json = R"([{"a": 0}, {"a": 100}, {"a": 200}])",
+        .expected_json = R"([{"a": 0}, {"a": 100}, {"a": 200}])",
+    },
+    {
+        .name = "Long",
+        .projected_type = int64(),
+        .source_type = int64(),
+        .input_json = R"([{"a": 0}, {"a": 1000000}, {"a": 2000000}])",
+        .expected_json = R"([{"a": 0}, {"a": 1000000}, {"a": 2000000}])",
+    },
+    {
+        .name = "Float",
+        .projected_type = float32(),
+        .source_type = float32(),
+        .input_json = R"([{"a": 0.0}, {"a": 3.14}, {"a": 6.28}])",
+        .expected_json = R"([{"a": 0.0}, {"a": 3.14}, {"a": 6.28}])",
+    },
+    {
+        .name = "Double",
+        .projected_type = float64(),
+        .source_type = float64(),
+        .input_json = R"([{"a": 0.0}, {"a": 1.234567890}, {"a": 
2.469135780}])",
+        .expected_json = R"([{"a": 0.0}, {"a": 1.234567890}, {"a": 
2.469135780}])",
+    },
+    {
+        .name = "String",
+        .projected_type = string(),
+        .source_type = string(),
+        .input_json =
+            R"([{"a": "test_string_0"}, {"a": "test_string_1"}, {"a": 
"test_string_2"}])",
+        .expected_json =
+            R"([{"a": "test_string_0"}, {"a": "test_string_1"}, {"a": 
"test_string_2"}])",
+    },
+    {
+        .name = "Binary",
+        .projected_type = binary(),
+        .source_type = binary(),
+        .input_json = R"([{"a": "abc"}, {"a": "bcd"}, {"a": "cde"}])",
+        .expected_json = R"([{"a": "abc"}, {"a": "bcd"}, {"a": "cde"}])",
+    },
+    {
+        .name = "Fixed",
+        .projected_type = fixed(4),
+        .source_type = fixed(4),
+        .input_json = R"([{"a": "abcd"}, {"a": "bcde"}, {"a": "cdef"}])",
+        .expected_json = R"([{"a": "abcd"}, {"a": "bcde"}, {"a": "cdef"}])",
+    },
+    {
+        .name = "Decimal",
+        .projected_type = decimal(10, 2),
+        .source_type = decimal(10, 2),
+        .input_json = R"([{"a": "0.00"}, {"a": "10.01"}, {"a": "20.02"}])",
+        .expected_json = R"([{"a": "0.00"}, {"a": "10.01"}, {"a": "20.02"}])",
+    },
+    {
+        .name = "Date",
+        .projected_type = date(),
+        .source_type = date(),
+        .input_json = R"([{"a": 18000}, {"a": 18001}, {"a": 18002}])",
+        .expected_json = R"([{"a": 18000}, {"a": 18001}, {"a": 18002}])",
+    },
+    {
+        .name = "Time",
+        .projected_type = time(),
+        .source_type = time(),
+        .input_json = R"([{"a": 45045123456}, {"a": 45046123456}, {"a": 
45047123456}])",
+        .expected_json =
+            R"([{"a": 45045123456}, {"a": 45046123456}, {"a": 45047123456}])",
+    },
+    {
+        .name = "Timestamp",
+        .projected_type = timestamp(),
+        .source_type = timestamp(),
+        .input_json = R"([{"a": 0}, {"a": 1000000}, {"a": 2000000}])",
+        .expected_json = R"([{"a": 0}, {"a": 1000000}, {"a": 2000000}])",
+    },
+    {
+        .name = "TimestampTz",
+        .projected_type = timestamp_tz(),
+        .source_type = timestamp_tz(),
+        .input_json =
+            R"([{"a": 1672531200000000}, {"a": 1672531201000000}, {"a": 
1672531202000000}])",
+        .expected_json =
+            R"([{"a": 1672531200000000}, {"a": 1672531201000000}, {"a": 
1672531202000000}])",
+    },
+    {
+        .name = "IntToLongPromotion",
+        .projected_type = int64(),
+        .source_type = int32(),
+        .input_json = R"([{"a": 0}, {"a": 100}, {"a": 200}])",
+        .expected_json = R"([{"a": 0}, {"a": 100}, {"a": 200}])",
+    },
+    {
+        .name = "FloatToDoublePromotion",
+        .projected_type = float64(),
+        .source_type = float32(),
+        .input_json = R"([{"a": 0.0}, {"a": 1.0}, {"a": 2.0}])",
+        .expected_json = R"([{"a": 0.0}, {"a": 1.0}, {"a": 2.0}])",
+    },
+    {
+        .name = "DecimalPrecisionPromotion",
+        .projected_type = decimal(10, 2),
+        .source_type = decimal(6, 2),
+        .input_json = R"([{"a": "0.00"}, {"a": "10.01"}, {"a": "20.02"}])",
+        .expected_json = R"([{"a": "0.00"}, {"a": "10.01"}, {"a": "20.02"}])",
+    },
+};
+
+INSTANTIATE_TEST_SUITE_P(
+    AllPrimitiveTypes, ProjectRecordBatchTest, 
::testing::ValuesIn(kPrimitiveTestCases),
+    [](const ::testing::TestParamInfo<ProjectRecordBatchParam>& info) {
+      return info.param.name;
+    });
+
+TEST(ProjectRecordBatchTest, StructWithTwoFields) {
+  Schema iceberg_schema({
+      SchemaField::MakeRequired(1, "id", int32()),
+      SchemaField::MakeRequired(2, "name", string()),
+  });
+
+  const std::string input_json = R"([{"id": 42, "name": "test"}])";
+  const std::string expected_json = R"([{"id": 42, "name": "test"}])";
+
+  ASSERT_NO_FATAL_FAILURE(VerifyProjectRecordBatch(iceberg_schema, 
iceberg_schema,
+                                                   input_json, expected_json));
+}
+
+TEST(ProjectRecordBatchTest, NestedStruct) {
+  Schema iceberg_schema({
+      SchemaField::MakeRequired(1, "id", int32()),
+      SchemaField::MakeRequired(2, "person",
+                                
std::make_shared<StructType>(std::vector<SchemaField>{
+                                    SchemaField::MakeRequired(3, "name", 
string()),
+                                    SchemaField::MakeRequired(4, "age", 
int32()),
+                                })),
+  });
+
+  const std::string input_json = R"([
+    {"id": 1, "person": {"name": "Person0", "age": 25}},
+    {"id": 2, "person": {"name": "Person1", "age": 26}}
+  ])";
+
+  ASSERT_NO_FATAL_FAILURE(
+      VerifyProjectRecordBatch(iceberg_schema, iceberg_schema, input_json, 
input_json));
+}
+
+TEST(ProjectRecordBatchTest, ListOfIntegers) {
+  Schema iceberg_schema({
+      SchemaField::MakeRequired(
+          1, "numbers",
+          std::make_shared<ListType>(SchemaField::MakeRequired(2, "element", 
int32()))),
+  });
+
+  const std::string input_json = R"([
+    {"numbers": [0, 1, 2]},
+    {"numbers": [10, 11, 12]}
+  ])";
+
+  ASSERT_NO_FATAL_FAILURE(
+      VerifyProjectRecordBatch(iceberg_schema, iceberg_schema, input_json, 
input_json));
+}
+
+TEST(ProjectRecordBatchTest, ListOfStructs) {
+  Schema iceberg_schema({
+      SchemaField::MakeRequired(1, "people",
+                                
std::make_shared<ListType>(SchemaField::MakeRequired(
+                                    2, "element",
+                                    
std::make_shared<StructType>(std::vector<SchemaField>{
+                                        SchemaField::MakeRequired(3, "name", 
string()),
+                                        SchemaField::MakeRequired(4, "age", 
int32()),
+                                    })))),
+  });
+
+  const std::string input_json = R"([
+    {"people": [
+      {"name": "Person0_0", "age": 20},
+      {"name": "Person0_1", "age": 21}
+    ]},
+    {"people": [
+      {"name": "Person1_0", "age": 30},
+      {"name": "Person1_1", "age": 31}
+    ]}
+  ])";
+
+  ASSERT_NO_FATAL_FAILURE(
+      VerifyProjectRecordBatch(iceberg_schema, iceberg_schema, input_json, 
input_json));
+}
+
+TEST(ProjectRecordBatchTest, MapStringToInt) {
+  Schema iceberg_schema({
+      SchemaField::MakeRequired(
+          1, "scores",
+          std::make_shared<MapType>(SchemaField::MakeRequired(2, "key", 
string()),
+                                    SchemaField::MakeRequired(3, "value", 
int32()))),
+  });
+
+  const std::string input_json = R"([
+    {"scores": [["score_0", 100], ["score_1", 105]]},
+    {"scores": [["score_2", 110], ["score_3", 115]]}
+  ])";
+
+  ASSERT_NO_FATAL_FAILURE(
+      VerifyProjectRecordBatch(iceberg_schema, iceberg_schema, input_json, 
input_json));
+}
+
+TEST(ProjectRecordBatchTest, MapStringToStruct) {
+  Schema iceberg_schema({
+      SchemaField::MakeRequired(
+          1, "users",
+          std::make_shared<MapType>(
+              SchemaField::MakeRequired(2, "key", string()),
+              SchemaField::MakeRequired(
+                  3, "value",
+                  std::make_shared<StructType>(std::vector<SchemaField>{
+                      SchemaField::MakeRequired(4, "id", int32()),
+                      SchemaField::MakeRequired(5, "email", string()),
+                  })))),
+  });
+
+  const std::string input_json = R"([
+    {"users": [["user_0", {"id": 1000, "email": "[email protected]"}]]},
+    {"users": [["user_1", {"id": 1001, "email": "[email protected]"}]]}
+  ])";
+
+  ASSERT_NO_FATAL_FAILURE(
+      VerifyProjectRecordBatch(iceberg_schema, iceberg_schema, input_json, 
input_json));
+}
+
+TEST(ProjectRecordBatchTest, StructWithMissingOptionalField) {
+  Schema projected_schema({
+      SchemaField::MakeRequired(1, "id", int32()),
+      SchemaField::MakeRequired(2, "name", string()),
+      SchemaField::MakeOptional(3, "age", int32()),     // Missing in source
+      SchemaField::MakeOptional(4, "email", string()),  // Missing in source
+  });
+
+  Schema source_schema({
+      SchemaField::MakeRequired(1, "id", int32()),
+      SchemaField::MakeRequired(2, "name", string()),
+  });
+
+  const std::string input_json = R"([
+    {"id": 1, "name": "Person0"},
+    {"id": 2, "name": "Person1"}
+  ])";
+  const std::string expected_json = R"([
+    {"id": 1, "name": "Person0", "age": null, "email": null},
+    {"id": 2, "name": "Person1", "age": null, "email": null}
+  ])";
+
+  ASSERT_NO_FATAL_FAILURE(VerifyProjectRecordBatch(projected_schema, 
source_schema,
+                                                   input_json, expected_json));
+}
+
+TEST(ProjectRecordBatchTest, NestedStructWithMissingOptionalFields) {
+  Schema projected_schema({
+      SchemaField::MakeRequired(1, "id", int32()),
+      SchemaField::MakeRequired(
+          2, "person",
+          std::make_shared<StructType>(std::vector<SchemaField>{
+              SchemaField::MakeRequired(3, "name", string()),
+              SchemaField::MakeOptional(4, "age", int32()),     // Missing
+              SchemaField::MakeOptional(5, "phone", string()),  // Missing
+          })),
+      SchemaField::MakeOptional(6, "department", string()),  // Missing
+  });
+
+  Schema source_schema({
+      SchemaField::MakeRequired(1, "id", int32()),
+      SchemaField::MakeRequired(2, "person",
+                                
std::make_shared<StructType>(std::vector<SchemaField>{
+                                    SchemaField::MakeRequired(3, "name", 
string()),
+                                })),
+  });
+
+  const std::string input_json = R"([
+    {"id": 100, "person": {"name": "Employee0"}},
+    {"id": 101, "person": {"name": "Employee1"}}
+  ])";
+  const std::string expected_json = R"([
+    {"id": 100, "person": {"name": "Employee0", "age": null, "phone": null}, 
"department": null},
+    {"id": 101, "person": {"name": "Employee1", "age": null, "phone": null}, 
"department": null}
+  ])";
+
+  ASSERT_NO_FATAL_FAILURE(VerifyProjectRecordBatch(projected_schema, 
source_schema,
+                                                   input_json, expected_json));
+}
+
+TEST(ProjectRecordBatchTest, ListWithMissingOptionalElementFields) {
+  Schema projected_schema({
+      SchemaField::MakeRequired(
+          1, "people",
+          std::make_shared<ListType>(SchemaField::MakeRequired(
+              2, "element",
+              std::make_shared<StructType>(std::vector<SchemaField>{
+                  SchemaField::MakeRequired(3, "name", string()),
+                  SchemaField::MakeOptional(4, "age", int32()),     // Missing 
in source
+                  SchemaField::MakeOptional(5, "email", string()),  // Missing 
in source
+              })))),
+  });
+
+  Schema source_schema({
+      SchemaField::MakeRequired(1, "people",
+                                
std::make_shared<ListType>(SchemaField::MakeRequired(
+                                    2, "element",
+                                    
std::make_shared<StructType>(std::vector<SchemaField>{
+                                        SchemaField::MakeRequired(3, "name", 
string()),
+                                    })))),
+  });
+
+  const std::string input_json = R"([
+    {"people": [
+      {"name": "Person0_0"},
+      {"name": "Person0_1"}
+    ]},
+    {"people": [
+      {"name": "Person1_0"},
+      {"name": "Person1_1"}
+    ]}
+  ])";
+  const std::string expected_json = R"([
+    {"people": [
+      {"name": "Person0_0", "age": null, "email": null},
+      {"name": "Person0_1", "age": null, "email": null}
+    ]},
+    {"people": [
+      {"name": "Person1_0", "age": null, "email": null},
+      {"name": "Person1_1", "age": null, "email": null}
+    ]}
+  ])";
+
+  ASSERT_NO_FATAL_FAILURE(VerifyProjectRecordBatch(projected_schema, 
source_schema,
+                                                   input_json, expected_json));
+}
+
+TEST(ProjectRecordBatchTest, FieldReordering) {
+  Schema projected_schema({
+      SchemaField::MakeRequired(2, "name", string()),
+      SchemaField::MakeRequired(1, "id", int32()),
+  });
+
+  Schema source_schema({
+      SchemaField::MakeRequired(1, "id", int32()),
+      SchemaField::MakeRequired(2, "name", string()),
+  });
+
+  const std::string input_json = R"([
+    {"id": 1, "name": "Alice"},
+    {"id": 2, "name": "Bob"}
+  ])";
+  const std::string expected_json = R"([
+    {"name": "Alice", "id": 1},
+    {"name": "Bob", "id": 2}
+  ])";
+
+  ASSERT_NO_FATAL_FAILURE(VerifyProjectRecordBatch(projected_schema, 
source_schema,
+                                                   input_json, expected_json));
+}
+
+TEST(ProjectRecordBatchTest, FieldSubset) {
+  Schema projected_schema({
+      SchemaField::MakeRequired(2, "name", string()),
+  });
+
+  Schema source_schema({
+      SchemaField::MakeRequired(1, "id", int32()),
+      SchemaField::MakeRequired(2, "name", string()),
+      SchemaField::MakeRequired(3, "age", int32()),
+  });
+
+  const std::string input_json = R"([
+    {"id": 1, "name": "Alice", "age": 25},
+    {"id": 2, "name": "Bob", "age": 30}
+  ])";
+  const std::string expected_json = R"([
+    {"name": "Alice"},
+    {"name": "Bob"}
+  ])";
+
+  ASSERT_NO_FATAL_FAILURE(VerifyProjectRecordBatch(projected_schema, 
source_schema,
+                                                   input_json, expected_json));
+}
+
+TEST(ProjectRecordBatchTest, EmptyRecordBatch) {
+  Schema iceberg_schema({
+      SchemaField::MakeRequired(1, "id", int32()),
+      SchemaField::MakeRequired(2, "name", string()),
+  });
+
+  const std::string input_json = R"([])";
+
+  ASSERT_NO_FATAL_FAILURE(
+      VerifyProjectRecordBatch(iceberg_schema, iceberg_schema, input_json, 
input_json));
+}
+
+}  // namespace iceberg::parquet


Reply via email to