This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 08da3a9  feat: add avro schema projection (#109)
08da3a9 is described below

commit 08da3a962fc518d9458366bd13e98e9ff1e101e6
Author: Gang Wu <[email protected]>
AuthorDate: Mon May 26 14:08:30 2025 +0800

    feat: add avro schema projection (#109)
---
 src/iceberg/avro/avro_schema_util.cc         | 402 +++++++++++++++++++
 src/iceberg/avro/avro_schema_util_internal.h |  14 +
 src/iceberg/schema_util.cc                   |  21 +-
 src/iceberg/schema_util_internal.h           |  47 +++
 test/avro_schema_test.cc                     | 554 +++++++++++++++++++++++++++
 5 files changed, 1018 insertions(+), 20 deletions(-)

diff --git a/src/iceberg/avro/avro_schema_util.cc 
b/src/iceberg/avro/avro_schema_util.cc
index 37a34d7..229c62b 100644
--- a/src/iceberg/avro/avro_schema_util.cc
+++ b/src/iceberg/avro/avro_schema_util.cc
@@ -31,6 +31,10 @@
 #include <avro/ValidSchema.hh>
 
 #include "iceberg/avro/avro_schema_util_internal.h"
+#include "iceberg/metadata_columns.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_util_internal.h"
+#include "iceberg/util/formatter.h"
 #include "iceberg/util/macros.h"
 #include "iceberg/util/visit_type.h"
 
@@ -377,4 +381,402 @@ Status HasIdVisitor::Visit(const ::avro::ValidSchema& 
schema) {
 
 Status HasIdVisitor::Visit(const ::avro::Schema& schema) { return 
Visit(schema.root()); }
 
+namespace {
+
+std::string ToString(const ::avro::NodePtr& node) {
+  std::stringstream ss;
+  ss << *node;
+  return ss.str();
+}
+
+std::string ToString(const ::avro::LogicalType& logical_type) {
+  std::stringstream ss;
+  logical_type.printJson(ss);
+  return ss.str();
+}
+
+std::string ToString(const ::avro::LogicalType::Type& logical_type) {
+  return ToString(::avro::LogicalType(logical_type));
+}
+
+bool HasLogicalType(const ::avro::NodePtr& node,
+                    ::avro::LogicalType::Type expected_type) {
+  return node->logicalType().type() == expected_type;
+}
+
+bool HasMapLogicalType(const ::avro::NodePtr& node) {
+  return node->logicalType().type() == ::avro::LogicalType::CUSTOM &&
+         node->logicalType().customLogicalType() != nullptr &&
+         node->logicalType().customLogicalType()->name() == "map";
+}
+
+std::optional<std::string> GetAdjustToUtc(const ::avro::NodePtr& node) {
+  if (node->customAttributes() == 0) {
+    return std::nullopt;
+  }
+  return 
node->customAttributesAt(0).getAttribute(std::string(kAdjustToUtcProp));
+}
+
+Result<int32_t> GetId(const ::avro::NodePtr& node, const std::string& 
attr_name,
+                      size_t field_idx) {
+  if (field_idx >= node->customAttributes()) {
+    return InvalidSchema("Field index {} exceeds available custom attributes 
{}",
+                         field_idx, node->customAttributes());
+  }
+
+  auto id_str = node->customAttributesAt(field_idx).getAttribute(attr_name);
+  if (!id_str.has_value()) {
+    return InvalidSchema("Missing avro attribute: {}", attr_name);
+  }
+
+  try {
+    return std::stoi(id_str.value());
+  } catch (const std::exception& e) {
+    return InvalidSchema("Invalid {}: {}", attr_name, id_str.value());
+  }
+}
+
+Result<int32_t> GetElementId(const ::avro::NodePtr& node) {
+  static const std::string kElementIdKey{kElementIdProp};
+  return GetId(node, kElementIdKey, /*field_idx=*/0);
+}
+
+Result<int32_t> GetKeyId(const ::avro::NodePtr& node) {
+  static const std::string kKeyIdKey{kKeyIdProp};
+  return GetId(node, kKeyIdKey, /*field_idx=*/0);
+}
+
+Result<int32_t> GetValueId(const ::avro::NodePtr& node) {
+  static const std::string kValueIdKey{kValueIdProp};
+  return GetId(node, kValueIdKey, /*field_idx=*/0);
+}
+
+Result<int32_t> GetFieldId(const ::avro::NodePtr& node, size_t field_idx) {
+  static const std::string kFieldIdKey{kFieldIdProp};
+  return GetId(node, kFieldIdKey, field_idx);
+}
+
+Status ValidateAvroSchemaEvolution(const Type& expected_type,
+                                   const ::avro::NodePtr& avro_node) {
+  switch (expected_type.type_id()) {
+    case TypeId::kBoolean:
+      if (avro_node->type() == ::avro::AVRO_BOOL) {
+        return {};
+      }
+      break;
+    case TypeId::kInt:
+      if (avro_node->type() == ::avro::AVRO_INT) {
+        return {};
+      }
+      break;
+    case TypeId::kLong:
+      if (avro_node->type() == ::avro::AVRO_LONG ||
+          avro_node->type() == ::avro::AVRO_INT) {
+        return {};
+      }
+      break;
+    case TypeId::kFloat:
+      if (avro_node->type() == ::avro::AVRO_FLOAT) {
+        return {};
+      }
+      break;
+    case TypeId::kDouble:
+      if (avro_node->type() == ::avro::AVRO_DOUBLE ||
+          avro_node->type() == ::avro::AVRO_FLOAT) {
+        return {};
+      }
+      break;
+    case TypeId::kDate:
+      if (avro_node->type() == ::avro::AVRO_INT &&
+          HasLogicalType(avro_node, ::avro::LogicalType::DATE)) {
+        return {};
+      }
+      break;
+    case TypeId::kTime:
+      if (avro_node->type() == ::avro::AVRO_LONG &&
+          HasLogicalType(avro_node, ::avro::LogicalType::TIME_MICROS)) {
+        return {};
+      }
+      break;
+    case TypeId::kTimestamp:
+      if (avro_node->type() == ::avro::AVRO_LONG &&
+          HasLogicalType(avro_node, ::avro::LogicalType::TIMESTAMP_MICROS) &&
+          GetAdjustToUtc(avro_node).value_or("false") == "true") {
+        return {};
+      }
+      break;
+    case TypeId::kTimestampTz:
+      if (avro_node->type() == ::avro::AVRO_LONG &&
+          HasLogicalType(avro_node, ::avro::LogicalType::TIMESTAMP_MICROS) &&
+          GetAdjustToUtc(avro_node).value_or("false") == "true") {
+        return {};
+      }
+      break;
+    case TypeId::kString:
+      if (avro_node->type() == ::avro::AVRO_STRING) {
+        return {};
+      }
+      break;
+    case TypeId::kDecimal:
+      if (avro_node->type() == ::avro::AVRO_FIXED &&
+          HasLogicalType(avro_node, ::avro::LogicalType::DECIMAL)) {
+        const auto& decimal_type =
+            internal::checked_cast<const DecimalType&>(expected_type);
+        const auto logical_type = avro_node->logicalType();
+        if (decimal_type.scale() == logical_type.scale() &&
+            decimal_type.precision() >= logical_type.precision()) {
+          return {};
+        }
+      }
+      break;
+    case TypeId::kUuid:
+      if (avro_node->type() == ::avro::AVRO_FIXED && avro_node->fixedSize() == 
16 &&
+          HasLogicalType(avro_node, ::avro::LogicalType::UUID)) {
+        return {};
+      }
+      break;
+    case TypeId::kFixed:
+      if (avro_node->type() == ::avro::AVRO_FIXED &&
+          avro_node->fixedSize() ==
+              internal::checked_cast<const 
FixedType&>(expected_type).length()) {
+        return {};
+      }
+      break;
+    case TypeId::kBinary:
+      if (avro_node->type() == ::avro::AVRO_BYTES) {
+        return {};
+      }
+      break;
+    default:
+      break;
+  }
+
+  return InvalidSchema("Cannot read Iceberg type: {} from Avro type: {}", 
expected_type,
+                       ToString(avro_node));
+}
+
+// XXX: Result<::avro::NodePtr> leads to unresolved external symbol error on 
Windows.
+Status UnwrapUnion(const ::avro::NodePtr& node, ::avro::NodePtr* result) {
+  if (node->type() != ::avro::AVRO_UNION) {
+    *result = node;
+    return {};
+  }
+  if (node->leaves() != 2) {
+    return InvalidSchema("Union type must have exactly two branches");
+  }
+  auto branch_0 = node->leafAt(0);
+  auto branch_1 = node->leafAt(1);
+  if (branch_0->type() == ::avro::AVRO_NULL) {
+    *result = branch_1;
+  } else if (branch_1->type() == ::avro::AVRO_NULL) {
+    *result = branch_0;
+  } else {
+    return InvalidSchema("Union type must have exactly one null branch, got 
{}",
+                         ToString(node));
+  }
+  return {};
+}
+
+// Forward declaration
+Result<FieldProjection> ProjectNested(const Type& expected_type,
+                                      const ::avro::NodePtr& avro_node,
+                                      bool prune_source);
+
+Result<FieldProjection> ProjectStruct(const StructType& struct_type,
+                                      const ::avro::NodePtr& avro_node,
+                                      bool prune_source) {
+  if (avro_node->type() != ::avro::AVRO_RECORD) {
+    return InvalidSchema("Expected AVRO_RECORD type, but got {}", 
ToString(avro_node));
+  }
+
+  const auto& expected_fields = struct_type.fields();
+
+  struct NodeInfo {
+    size_t local_index;
+    ::avro::NodePtr field_node;
+  };
+  std::unordered_map<int32_t, NodeInfo> node_info_map;
+  node_info_map.reserve(avro_node->leaves());
+
+  for (size_t i = 0; i < avro_node->leaves(); ++i) {
+    ICEBERG_ASSIGN_OR_RAISE(int32_t field_id, GetFieldId(avro_node, i));
+    ::avro::NodePtr field_node = avro_node->leafAt(i);
+    if (const auto [iter, inserted] = node_info_map.emplace(
+            std::piecewise_construct, std::forward_as_tuple(field_id),
+            std::forward_as_tuple(i, field_node));
+        !inserted) [[unlikely]] {
+      return InvalidSchema("Duplicate field id found in Avro schema: {}", 
field_id);
+    }
+  }
+
+  FieldProjection result;
+  result.children.reserve(expected_fields.size());
+
+  for (const auto& expected_field : expected_fields) {
+    int32_t field_id = expected_field.field_id();
+    FieldProjection child_projection;
+
+    if (auto iter = node_info_map.find(field_id); iter != 
node_info_map.cend()) {
+      ::avro::NodePtr field_node;
+      ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(iter->second.field_node, 
&field_node));
+      if (expected_field.type()->is_nested()) {
+        ICEBERG_ASSIGN_OR_RAISE(
+            child_projection,
+            ProjectNested(*expected_field.type(), field_node, prune_source));
+      } else {
+        ICEBERG_RETURN_UNEXPECTED(
+            ValidateAvroSchemaEvolution(*expected_field.type(), field_node));
+      }
+      child_projection.from = iter->second.local_index;
+      child_projection.kind = FieldProjection::Kind::kProjected;
+    } else if (MetadataColumns::IsMetadataColumn(field_id)) {
+      child_projection.kind = FieldProjection::Kind::kMetadata;
+    } else if (expected_field.optional()) {
+      child_projection.kind = FieldProjection::Kind::kNull;
+    } else {
+      return InvalidSchema("Missing required field with ID: {}", field_id);
+    }
+
+    result.children.emplace_back(std::move(child_projection));
+  }
+
+  if (prune_source) {
+    PruneFieldProjection(result);
+  }
+
+  return result;
+}
+
+Result<FieldProjection> ProjectList(const ListType& list_type,
+                                    const ::avro::NodePtr& avro_node, bool 
prune_source) {
+  if (avro_node->type() != ::avro::AVRO_ARRAY) {
+    return InvalidSchema("Expected AVRO_ARRAY type, but got {}", 
ToString(avro_node));
+  }
+  if (avro_node->leaves() != 1) {
+    return InvalidSchema("Array type must have exactly one node, got {}",
+                         avro_node->leaves());
+  }
+
+  const auto& expected_element_field = list_type.fields().back();
+  ICEBERG_ASSIGN_OR_RAISE(int32_t avro_element_id, GetElementId(avro_node));
+  if (expected_element_field.field_id() != avro_element_id) [[unlikely]] {
+    return InvalidSchema("element-id mismatch, expected {}, got {}",
+                         expected_element_field.field_id(), avro_element_id);
+  }
+
+  FieldProjection element_projection;
+  ::avro::NodePtr element_node;
+  ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(avro_node->leafAt(0), &element_node));
+  if (expected_element_field.type()->is_nested()) {
+    ICEBERG_ASSIGN_OR_RAISE(
+        element_projection,
+        ProjectNested(*expected_element_field.type(), element_node, 
prune_source));
+  } else {
+    ICEBERG_RETURN_UNEXPECTED(
+        ValidateAvroSchemaEvolution(*expected_element_field.type(), 
element_node));
+  }
+
+  FieldProjection result;
+  result.children.emplace_back(std::move(element_projection));
+  return result;
+}
+
+Result<FieldProjection> ProjectMap(const MapType& map_type,
+                                   const ::avro::NodePtr& avro_node, bool 
prune_source) {
+  const auto& expected_key_field = map_type.key();
+  const auto& expected_value_field = map_type.value();
+
+  FieldProjection result, key_projection, value_projection;
+  int32_t avro_key_id, avro_value_id;
+  ::avro::NodePtr map_node;
+
+  if (avro_node->type() == ::avro::AVRO_MAP) {
+    if (avro_node->leaves() != 2) {
+      return InvalidSchema("Map type must have exactly two nodes, got {}",
+                           avro_node->leaves());
+    }
+    map_node = avro_node;
+
+    ICEBERG_ASSIGN_OR_RAISE(avro_key_id, GetKeyId(avro_node));
+    ICEBERG_ASSIGN_OR_RAISE(avro_value_id, GetValueId(avro_node));
+  } else if (avro_node->type() == ::avro::AVRO_ARRAY && 
HasMapLogicalType(avro_node)) {
+    if (avro_node->leaves() != 1) {
+      return InvalidSchema("Array-backed map type must have exactly one node, 
got {}",
+                           avro_node->leaves());
+    }
+
+    map_node = avro_node->leafAt(0);
+    if (map_node->type() != ::avro::AVRO_RECORD || map_node->leaves() != 2) {
+      return InvalidSchema(
+          "Array-backed map type must have a record node with two fields");
+    }
+
+    ICEBERG_ASSIGN_OR_RAISE(avro_key_id, GetFieldId(map_node, 0));
+    ICEBERG_ASSIGN_OR_RAISE(avro_value_id, GetFieldId(map_node, 1));
+  } else {
+    return InvalidSchema("Expected a map type, but got Avro type {}",
+                         ToString(avro_node));
+  }
+
+  if (expected_key_field.field_id() != avro_key_id) {
+    return InvalidSchema("key-id mismatch, expected {}, got {}",
+                         expected_key_field.field_id(), avro_key_id);
+  }
+  if (expected_value_field.field_id() != avro_value_id) {
+    return InvalidSchema("value-id mismatch, expected {}, got {}",
+                         expected_value_field.field_id(), avro_value_id);
+  }
+
+  for (size_t i = 0; i < map_node->leaves(); ++i) {
+    FieldProjection sub_projection;
+    ::avro::NodePtr sub_node;
+    ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(map_node->leafAt(i), &sub_node));
+    const auto& expected_sub_field = map_type.fields()[i];
+    if (expected_sub_field.type()->is_nested()) {
+      ICEBERG_ASSIGN_OR_RAISE(sub_projection, 
ProjectNested(*expected_sub_field.type(),
+                                                            sub_node, 
prune_source));
+    } else {
+      ICEBERG_RETURN_UNEXPECTED(
+          ValidateAvroSchemaEvolution(*expected_sub_field.type(), sub_node));
+    }
+    sub_projection.kind = FieldProjection::Kind::kProjected;
+    sub_projection.from = i;
+    result.children.emplace_back(std::move(sub_projection));
+  }
+
+  return result;
+}
+
+Result<FieldProjection> ProjectNested(const Type& expected_type,
+                                      const ::avro::NodePtr& avro_node,
+                                      bool prune_source) {
+  if (!expected_type.is_nested()) {
+    return InvalidSchema("Expected a nested type, but got {}", expected_type);
+  }
+
+  switch (expected_type.type_id()) {
+    case TypeId::kStruct:
+      return ProjectStruct(internal::checked_cast<const 
StructType&>(expected_type),
+                           avro_node, prune_source);
+    case TypeId::kList:
+      return ProjectList(internal::checked_cast<const 
ListType&>(expected_type),
+                         avro_node, prune_source);
+    case TypeId::kMap:
+      return ProjectMap(internal::checked_cast<const MapType&>(expected_type), 
avro_node,
+                        prune_source);
+    default:
+      return InvalidSchema("Unsupported nested type: {}", expected_type);
+  }
+}
+
+}  // namespace
+
+Result<SchemaProjection> Project(const Schema& expected_schema,
+                                 const ::avro::NodePtr& avro_node, bool 
prune_source) {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto field_projection,
+      ProjectNested(static_cast<const Type&>(expected_schema), avro_node, 
prune_source));
+  return SchemaProjection{std::move(field_projection.children)};
+}
+
 }  // namespace iceberg::avro
diff --git a/src/iceberg/avro/avro_schema_util_internal.h 
b/src/iceberg/avro/avro_schema_util_internal.h
index d0beb09..50ff9b2 100644
--- a/src/iceberg/avro/avro_schema_util_internal.h
+++ b/src/iceberg/avro/avro_schema_util_internal.h
@@ -24,6 +24,7 @@
 #include <avro/Node.hh>
 
 #include "iceberg/result.h"
+#include "iceberg/schema_util.h"
 #include "iceberg/type.h"
 
 namespace avro {
@@ -121,4 +122,17 @@ class HasIdVisitor {
   size_t fields_with_id_ = 0;
 };
 
+/// \brief Project an Iceberg Schema onto an Avro NodePtr.
+///
+/// This function creates a projection from an Iceberg Schema to an Avro 
schema node.
+/// The projection determines how to read data from the Avro schema into the 
expected
+/// Iceberg Schema.
+///
+/// \param expected_schema The Iceberg Schema that defines the expected 
structure.
+/// \param avro_node The Avro node to read data from.
+/// \param prune_source Whether the source schema can be pruned.
+/// \return The schema projection result.
+Result<SchemaProjection> Project(const Schema& expected_schema,
+                                 const ::avro::NodePtr& avro_node, bool 
prune_source);
+
 }  // namespace iceberg::avro
diff --git a/src/iceberg/schema_util.cc b/src/iceberg/schema_util.cc
index d8a5dcf..3e409ef 100644
--- a/src/iceberg/schema_util.cc
+++ b/src/iceberg/schema_util.cc
@@ -20,12 +20,12 @@
 #include "iceberg/schema_util.h"
 
 #include <format>
-#include <map>
 #include <string_view>
 #include <unordered_map>
 
 #include "iceberg/metadata_columns.h"
 #include "iceberg/schema.h"
+#include "iceberg/schema_util_internal.h"
 #include "iceberg/util/checked_cast.h"
 #include "iceberg/util/formatter_internal.h"
 #include "iceberg/util/macros.h"
@@ -78,25 +78,6 @@ Status ValidateSchemaEvolution(const Type& expected_type, 
const Type& source_typ
   return NotSupported("Cannot read {} from {}", expected_type, source_type);
 }
 
-// Fix `from` field of `FieldProjection` to use pruned field index.
-void PruneFieldProjection(FieldProjection& field_projection) {
-  std::map<size_t, size_t> local_index_to_pruned_index;
-  for (const auto& child_projection : field_projection.children) {
-    if (child_projection.kind == FieldProjection::Kind::kProjected) {
-      local_index_to_pruned_index.emplace(std::get<1>(child_projection.from), 
0);
-    }
-  }
-  for (size_t pruned_index = 0; auto& [_, value] : 
local_index_to_pruned_index) {
-    value = pruned_index++;
-  }
-  for (auto& child_projection : field_projection.children) {
-    if (child_projection.kind == FieldProjection::Kind::kProjected) {
-      child_projection.from =
-          local_index_to_pruned_index.at(std::get<1>(child_projection.from));
-    }
-  }
-}
-
 Result<FieldProjection> ProjectNested(const Type& expected_type, const Type& 
source_type,
                                       bool prune_source) {
   if (!expected_type.is_nested()) {
diff --git a/src/iceberg/schema_util_internal.h 
b/src/iceberg/schema_util_internal.h
new file mode 100644
index 0000000..33aad93
--- /dev/null
+++ b/src/iceberg/schema_util_internal.h
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <map>
+
+#include "iceberg/schema_util.h"
+
+namespace iceberg {
+
+// Fix `from` field of `FieldProjection` to use pruned field index.
+void PruneFieldProjection(FieldProjection& field_projection) {
+  std::map<size_t, size_t> local_index_to_pruned_index;
+  for (const auto& child_projection : field_projection.children) {
+    if (child_projection.kind == FieldProjection::Kind::kProjected) {
+      local_index_to_pruned_index.emplace(std::get<1>(child_projection.from), 
0);
+    }
+  }
+  for (size_t pruned_index = 0; auto& [_, value] : 
local_index_to_pruned_index) {
+    value = pruned_index++;
+  }
+  for (auto& child_projection : field_projection.children) {
+    if (child_projection.kind == FieldProjection::Kind::kProjected) {
+      child_projection.from =
+          local_index_to_pruned_index.at(std::get<1>(child_projection.from));
+    }
+  }
+}
+
+}  // namespace iceberg
diff --git a/test/avro_schema_test.cc b/test/avro_schema_test.cc
index b24354f..7d18ae8 100644
--- a/test/avro_schema_test.cc
+++ b/test/avro_schema_test.cc
@@ -25,6 +25,8 @@
 #include <gtest/gtest.h>
 
 #include "iceberg/avro/avro_schema_util_internal.h"
+#include "iceberg/metadata_columns.h"
+#include "iceberg/schema.h"
 #include "matchers.h"
 
 namespace iceberg::avro {
@@ -515,4 +517,556 @@ TEST(HasIdVisitorTest, ArrayBackedMapWithPartialIds) {
   EXPECT_FALSE(visitor.AllHaveIds());
 }
 
+TEST(AvroSchemaProjectionTest, ProjectIdenticalSchemas) {
+  // Create an iceberg schema
+  Schema expected_schema({
+      SchemaField::MakeRequired(/*field_id=*/1, "id", 
std::make_shared<LongType>()),
+      SchemaField::MakeOptional(/*field_id=*/2, "name", 
std::make_shared<StringType>()),
+      SchemaField::MakeOptional(/*field_id=*/3, "age", 
std::make_shared<IntType>()),
+      SchemaField::MakeRequired(/*field_id=*/4, "data", 
std::make_shared<DoubleType>()),
+  });
+
+  // Create equivalent avro schema
+  std::string avro_schema_json = R"({
+    "type": "record",
+    "name": "iceberg_schema",
+    "fields": [
+      {"name": "id", "type": "long", "field-id": 1},
+      {"name": "name", "type": ["null", "string"], "field-id": 2},
+      {"name": "age", "type": ["null", "int"], "field-id": 3},
+      {"name": "data", "type": "double", "field-id": 4}
+    ]
+  })";
+  auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+  auto projection_result =
+      Project(expected_schema, avro_schema.root(), /*prune_source=*/false);
+  ASSERT_THAT(projection_result, IsOk());
+
+  const auto& projection = *projection_result;
+  ASSERT_EQ(projection.fields.size(), 4);
+  for (size_t i = 0; i < projection.fields.size(); ++i) {
+    ASSERT_EQ(projection.fields[i].kind, FieldProjection::Kind::kProjected);
+    ASSERT_EQ(std::get<1>(projection.fields[i].from), i);
+  }
+}
+
+TEST(AvroSchemaProjectionTest, ProjectSubsetSchema) {
+  // Create a subset iceberg schema
+  Schema expected_schema({
+      SchemaField::MakeRequired(/*field_id=*/1, "id", 
std::make_shared<LongType>()),
+      SchemaField::MakeOptional(/*field_id=*/3, "age", 
std::make_shared<IntType>()),
+  });
+
+  // Create full avro schema
+  std::string avro_schema_json = R"({
+    "type": "record",
+    "name": "iceberg_schema",
+    "fields": [
+      {"name": "id", "type": "long", "field-id": 1},
+      {"name": "name", "type": ["null", "string"], "field-id": 2},
+      {"name": "age", "type": ["null", "int"], "field-id": 3},
+      {"name": "data", "type": "double", "field-id": 4}
+    ]
+  })";
+  auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+  auto projection_result =
+      Project(expected_schema, avro_schema.root(), /*prune_source=*/false);
+  ASSERT_THAT(projection_result, IsOk());
+
+  const auto& projection = *projection_result;
+  ASSERT_EQ(projection.fields.size(), 2);
+  ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected);
+  ASSERT_EQ(std::get<1>(projection.fields[0].from), 0);
+  ASSERT_EQ(projection.fields[1].kind, FieldProjection::Kind::kProjected);
+  ASSERT_EQ(std::get<1>(projection.fields[1].from), 2);
+}
+
+TEST(AvroSchemaProjectionTest, ProjectWithPruning) {
+  // Create a subset iceberg schema
+  Schema expected_schema({
+      SchemaField::MakeRequired(/*field_id=*/1, "id", 
std::make_shared<LongType>()),
+      SchemaField::MakeOptional(/*field_id=*/3, "age", 
std::make_shared<IntType>()),
+  });
+
+  // Create full avro schema
+  std::string avro_schema_json = R"({
+    "type": "record",
+    "name": "iceberg_schema",
+    "fields": [
+      {"name": "id", "type": "long", "field-id": 1},
+      {"name": "name", "type": ["null", "string"], "field-id": 2},
+      {"name": "age", "type": ["null", "int"], "field-id": 3},
+      {"name": "data", "type": "double", "field-id": 4}
+    ]
+  })";
+  auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+  auto projection_result =
+      Project(expected_schema, avro_schema.root(), /*prune_source=*/true);
+  ASSERT_THAT(projection_result, IsOk());
+
+  const auto& projection = *projection_result;
+  ASSERT_EQ(projection.fields.size(), 2);
+  ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected);
+  ASSERT_EQ(std::get<1>(projection.fields[0].from), 0);
+  ASSERT_EQ(projection.fields[1].kind, FieldProjection::Kind::kProjected);
+  ASSERT_EQ(std::get<1>(projection.fields[1].from), 1);
+}
+
+TEST(AvroSchemaProjectionTest, ProjectMissingOptionalField) {
+  // Create iceberg schema with an extra optional field
+  Schema expected_schema({
+      SchemaField::MakeRequired(/*field_id=*/1, "id", 
std::make_shared<LongType>()),
+      SchemaField::MakeOptional(/*field_id=*/2, "name", 
std::make_shared<StringType>()),
+      SchemaField::MakeOptional(/*field_id=*/10, "extra", 
std::make_shared<StringType>()),
+  });
+
+  // Create avro schema without the extra field
+  std::string avro_schema_json = R"({
+    "type": "record",
+    "name": "iceberg_schema",
+    "fields": [
+      {"name": "id", "type": "long", "field-id": 1},
+      {"name": "name", "type": ["null", "string"], "field-id": 2}
+    ]
+  })";
+  auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+  auto projection_result =
+      Project(expected_schema, avro_schema.root(), /*prune_source=*/false);
+  ASSERT_THAT(projection_result, IsOk());
+
+  const auto& projection = *projection_result;
+  ASSERT_EQ(projection.fields.size(), 3);
+  ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected);
+  ASSERT_EQ(std::get<1>(projection.fields[0].from), 0);
+  ASSERT_EQ(projection.fields[1].kind, FieldProjection::Kind::kProjected);
+  ASSERT_EQ(std::get<1>(projection.fields[1].from), 1);
+  ASSERT_EQ(projection.fields[2].kind, FieldProjection::Kind::kNull);
+}
+
+TEST(AvroSchemaProjectionTest, ProjectMissingRequiredField) {
+  // Create iceberg schema with a required field that's missing from the avro 
schema
+  Schema expected_schema({
+      SchemaField::MakeRequired(/*field_id=*/1, "id", 
std::make_shared<LongType>()),
+      SchemaField::MakeOptional(/*field_id=*/2, "name", 
std::make_shared<StringType>()),
+      SchemaField::MakeRequired(/*field_id=*/10, "extra", 
std::make_shared<StringType>()),
+  });
+
+  std::string avro_schema_json = R"({
+    "type": "record",
+    "name": "iceberg_schema",
+    "fields": [
+      {"name": "id", "type": "long", "field-id": 1},
+      {"name": "name", "type": ["null", "string"], "field-id": 2}
+    ]
+  })";
+  auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+  auto projection_result =
+      Project(expected_schema, avro_schema.root(), /*prune_source=*/false);
+  ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema));
+  ASSERT_THAT(projection_result, HasErrorMessage("Missing required field"));
+}
+
+TEST(AvroSchemaProjectionTest, ProjectMetadataColumn) {
+  // Create iceberg schema with a metadata column
+  Schema expected_schema({
+      SchemaField::MakeRequired(/*field_id=*/1, "id", 
std::make_shared<LongType>()),
+      MetadataColumns::kFilePath,
+  });
+
+  std::string avro_schema_json = R"({
+    "type": "record",
+    "name": "iceberg_schema",
+    "fields": [
+      {"name": "id", "type": "long", "field-id": 1}
+    ]
+  })";
+  auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+  auto projection_result =
+      Project(expected_schema, avro_schema.root(), /*prune_source=*/false);
+  ASSERT_THAT(projection_result, IsOk());
+
+  const auto& projection = *projection_result;
+  ASSERT_EQ(projection.fields.size(), 2);
+  ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected);
+  ASSERT_EQ(std::get<1>(projection.fields[0].from), 0);
+  ASSERT_EQ(projection.fields[1].kind, FieldProjection::Kind::kMetadata);
+}
+
+TEST(AvroSchemaProjectionTest, ProjectSchemaEvolutionIntToLong) {
+  // Create iceberg schema expecting a long
+  Schema expected_schema({
+      SchemaField::MakeRequired(/*field_id=*/1, "id", 
std::make_shared<LongType>()),
+  });
+
+  // Create avro schema with an int
+  std::string avro_schema_json = R"({
+    "type": "record",
+    "name": "iceberg_schema",
+    "fields": [
+      {"name": "id", "type": "int", "field-id": 1}
+    ]
+  })";
+  auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+  auto projection_result =
+      Project(expected_schema, avro_schema.root(), /*prune_source=*/false);
+  ASSERT_THAT(projection_result, IsOk());
+
+  const auto& projection = *projection_result;
+  ASSERT_EQ(projection.fields.size(), 1);
+  ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected);
+  ASSERT_EQ(std::get<1>(projection.fields[0].from), 0);
+}
+
+TEST(AvroSchemaProjectionTest, ProjectSchemaEvolutionFloatToDouble) {
+  // Create iceberg schema expecting a double
+  Schema expected_schema({
+      SchemaField::MakeRequired(/*field_id=*/1, "value", 
std::make_shared<DoubleType>()),
+  });
+
+  // Create avro schema with a float
+  std::string avro_schema_json = R"({
+    "type": "record",
+    "name": "iceberg_schema",
+    "fields": [
+      {"name": "value", "type": "float", "field-id": 1}
+    ]
+  })";
+  auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+  auto projection_result =
+      Project(expected_schema, avro_schema.root(), /*prune_source=*/false);
+  ASSERT_THAT(projection_result, IsOk());
+
+  const auto& projection = *projection_result;
+  ASSERT_EQ(projection.fields.size(), 1);
+  ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected);
+  ASSERT_EQ(std::get<1>(projection.fields[0].from), 0);
+}
+
+TEST(AvroSchemaProjectionTest, ProjectSchemaEvolutionIncompatibleTypes) {
+  // Create iceberg schema expecting an int
+  Schema expected_schema({
+      SchemaField::MakeRequired(/*field_id=*/1, "value", 
std::make_shared<IntType>()),
+  });
+
+  // Create avro schema with a string
+  std::string avro_schema_json = R"({
+    "type": "record",
+    "name": "iceberg_schema",
+    "fields": [
+      {"name": "value", "type": "string", "field-id": 1}
+    ]
+  })";
+  auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+  auto projection_result =
+      Project(expected_schema, avro_schema.root(), /*prune_source=*/false);
+  ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema));
+  ASSERT_THAT(projection_result, HasErrorMessage("Cannot read"));
+}
+
+TEST(AvroSchemaProjectionTest, ProjectNestedStructures) {
+  // Create iceberg schema with nested struct
+  Schema expected_schema({
+      SchemaField::MakeRequired(/*field_id=*/1, "id", 
std::make_shared<LongType>()),
+      SchemaField::MakeOptional(
+          /*field_id=*/3, "address",
+          std::make_shared<StructType>(std::vector<SchemaField>{
+              SchemaField::MakeOptional(/*field_id=*/101, "street",
+                                        std::make_shared<StringType>()),
+              SchemaField::MakeOptional(/*field_id=*/102, "city",
+                                        std::make_shared<StringType>()),
+          })),
+  });
+
+  // Create equivalent avro schema
+  std::string avro_schema_json = R"({
+    "type": "record",
+    "name": "iceberg_schema",
+    "fields": [
+      {"name": "id", "type": "long", "field-id": 1},
+      {"name": "address", "type": ["null", {
+        "type": "record",
+        "name": "address_record",
+        "fields": [
+          {"name": "street", "type": ["null", "string"], "field-id": 101},
+          {"name": "city", "type": ["null", "string"], "field-id": 102}
+        ]
+      }], "field-id": 3}
+    ]
+  })";
+  auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+  auto projection_result =
+      Project(expected_schema, avro_schema.root(), /*prune_source=*/true);
+  ASSERT_THAT(projection_result, IsOk());
+
+  const auto& projection = *projection_result;
+  ASSERT_EQ(projection.fields.size(), 2);
+  ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected);
+  ASSERT_EQ(std::get<1>(projection.fields[0].from), 0);
+  ASSERT_EQ(projection.fields[1].kind, FieldProjection::Kind::kProjected);
+  ASSERT_EQ(std::get<1>(projection.fields[1].from), 1);
+
+  // Verify struct field has children correctly mapped
+  ASSERT_EQ(projection.fields[1].children.size(), 2);
+  ASSERT_EQ(projection.fields[1].children[0].kind, 
FieldProjection::Kind::kProjected);
+  ASSERT_EQ(std::get<1>(projection.fields[1].children[0].from), 0);
+  ASSERT_EQ(projection.fields[1].children[1].kind, 
FieldProjection::Kind::kProjected);
+  ASSERT_EQ(std::get<1>(projection.fields[1].children[1].from), 1);
+}
+
+TEST(AvroSchemaProjectionTest, ProjectListType) {
+  // Create iceberg schema with a list
+  Schema expected_schema({
+      SchemaField::MakeRequired(/*field_id=*/1, "id", 
std::make_shared<LongType>()),
+      SchemaField::MakeOptional(
+          /*field_id=*/2, "numbers",
+          std::make_shared<ListType>(SchemaField::MakeOptional(
+              /*field_id=*/101, "element", std::make_shared<IntType>()))),
+  });
+
+  // Create equivalent avro schema
+  std::string avro_schema_json = R"({
+    "type": "record",
+    "name": "iceberg_schema",
+    "fields": [
+      {"name": "id", "type": "long", "field-id": 1},
+      {"name": "numbers", "type": ["null", {
+        "type": "array",
+        "items": ["null", "int"],
+        "element-id": 101
+      }], "field-id": 2}
+    ]
+  })";
+  auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+  auto projection_result =
+      Project(expected_schema, avro_schema.root(), /*prune_source=*/false);
+  ASSERT_THAT(projection_result, IsOk());
+
+  const auto& projection = *projection_result;
+  ASSERT_EQ(projection.fields.size(), 2);
+  ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected);
+  ASSERT_EQ(std::get<1>(projection.fields[0].from), 0);
+  ASSERT_EQ(projection.fields[1].kind, FieldProjection::Kind::kProjected);
+  ASSERT_EQ(std::get<1>(projection.fields[1].from), 1);
+}
+
+TEST(AvroSchemaProjectionTest, ProjectMapType) {
+  // Create iceberg schema with a string->int map
+  Schema expected_schema({
+      SchemaField::MakeOptional(
+          /*field_id=*/1, "counts",
+          std::make_shared<MapType>(
+              SchemaField::MakeRequired(/*field_id=*/101, "key",
+                                        std::make_shared<StringType>()),
+              SchemaField::MakeOptional(/*field_id=*/102, "value",
+                                        std::make_shared<IntType>()))),
+  });
+
+  // Create equivalent avro schema
+  std::string avro_schema_json = R"({
+    "type": "record",
+    "name": "iceberg_schema",
+    "fields": [
+      {"name": "counts", "type": ["null", {
+        "type": "map",
+        "values": ["null", "int"],
+        "key-id": 101,
+        "value-id": 102
+      }], "field-id": 1}
+    ]
+  })";
+  auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+  auto projection_result =
+      Project(expected_schema, avro_schema.root(), /*prune_source=*/false);
+  ASSERT_THAT(projection_result, IsOk());
+
+  const auto& projection = *projection_result;
+  ASSERT_EQ(projection.fields.size(), 1);
+  ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected);
+  ASSERT_EQ(std::get<1>(projection.fields[0].from), 0);
+  ASSERT_EQ(projection.fields[0].children.size(), 2);
+}
+
+TEST(AvroSchemaProjectionTest, ProjectMapTypeWithNonStringKey) {
+  // Create iceberg schema with an int->string map
+  Schema expected_schema({
+      SchemaField::MakeOptional(
+          /*field_id=*/1, "counts",
+          std::make_shared<MapType>(
+              SchemaField::MakeRequired(/*field_id=*/101, "key",
+                                        std::make_shared<IntType>()),
+              SchemaField::MakeOptional(/*field_id=*/102, "value",
+                                        std::make_shared<StringType>()))),
+  });
+
+  // Create equivalent avro schema (using array-backed map for non-string keys)
+  std::string avro_schema_json = R"({
+    "type": "record",
+    "name": "iceberg_schema",
+    "fields": [
+      {"name": "counts", "type": ["null", {
+        "type": "array",
+        "items": {
+          "type": "record",
+          "name": "key_value",
+          "fields": [
+            {"name": "key", "type": "int", "field-id": 101},
+            {"name": "value", "type": ["null", "string"], "field-id": 102}
+          ]
+        },
+        "logicalType": "map"
+      }], "field-id": 1}
+    ]
+  })";
+  auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+  auto projection_result =
+      Project(expected_schema, avro_schema.root(), /*prune_source=*/false);
+  ASSERT_THAT(projection_result, IsOk());
+
+  const auto& projection = *projection_result;
+  ASSERT_EQ(projection.fields.size(), 1);
+  ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected);
+  ASSERT_EQ(std::get<1>(projection.fields[0].from), 0);
+  ASSERT_EQ(projection.fields[0].children.size(), 2);
+}
+
+TEST(AvroSchemaProjectionTest, ProjectListOfStruct) {
+  // Create iceberg schema with list of struct
+  Schema expected_schema({
+      SchemaField::MakeOptional(
+          /*field_id=*/1, "items",
+          std::make_shared<ListType>(SchemaField::MakeOptional(
+              /*field_id=*/101, "element",
+              std::make_shared<StructType>(std::vector<SchemaField>{
+                  SchemaField::MakeOptional(/*field_id=*/102, "x",
+                                            std::make_shared<IntType>()),
+                  SchemaField::MakeRequired(/*field_id=*/103, "y",
+                                            std::make_shared<StringType>()),
+              })))),
+  });
+
+  // Create equivalent avro schema
+  std::string avro_schema_json = R"({
+    "type": "record",
+    "name": "iceberg_schema",
+    "fields": [
+      {"name": "items", "type": ["null", {
+        "type": "array",
+        "items": ["null", {
+          "type": "record",
+          "name": "element_record",
+          "fields": [
+            {"name": "x", "type": ["null", "int"], "field-id": 102},
+            {"name": "y", "type": "string", "field-id": 103}
+          ]
+        }],
+        "element-id": 101
+      }], "field-id": 1}
+    ]
+  })";
+  auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+  auto projection_result =
+      Project(expected_schema, avro_schema.root(), /*prune_source=*/false);
+  ASSERT_THAT(projection_result, IsOk());
+
+  const auto& projection = *projection_result;
+  ASSERT_EQ(projection.fields.size(), 1);
+  ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected);
+  ASSERT_EQ(std::get<1>(projection.fields[0].from), 0);
+
+  // Verify list element struct is properly projected
+  ASSERT_EQ(projection.fields[0].children.size(), 1);
+  const auto& element_proj = projection.fields[0].children[0];
+  ASSERT_EQ(element_proj.children.size(), 2);
+  ASSERT_EQ(element_proj.children[0].kind, FieldProjection::Kind::kProjected);
+  ASSERT_EQ(std::get<1>(element_proj.children[0].from), 0);
+  ASSERT_EQ(element_proj.children[1].kind, FieldProjection::Kind::kProjected);
+  ASSERT_EQ(std::get<1>(element_proj.children[1].from), 1);
+}
+
+TEST(AvroSchemaProjectionTest, ProjectDecimalType) {
+  // Create iceberg schema with decimal
+  Schema expected_schema({
+      SchemaField::MakeRequired(/*field_id=*/1, "value",
+                                std::make_shared<DecimalType>(18, 2)),
+  });
+
+  // Create avro schema with decimal
+  std::string avro_schema_json = R"({
+    "type": "record",
+    "name": "iceberg_schema",
+    "fields": [
+      {
+        "name": "value",
+        "type": {
+          "type": "fixed",
+          "name": "decimal_9_2",
+          "size": 4,
+          "logicalType": "decimal",
+          "precision": 9,
+          "scale": 2
+        },
+        "field-id": 1
+      }
+    ]
+  })";
+  auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+  auto projection_result =
+      Project(expected_schema, avro_schema.root(), /*prune_source=*/false);
+  ASSERT_THAT(projection_result, IsOk());
+
+  const auto& projection = *projection_result;
+  ASSERT_EQ(projection.fields.size(), 1);
+  ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected);
+  ASSERT_EQ(std::get<1>(projection.fields[0].from), 0);
+}
+
+TEST(AvroSchemaProjectionTest, ProjectDecimalIncompatible) {
+  // Create iceberg schema with decimal having different scale
+  Schema expected_schema({
+      SchemaField::MakeRequired(/*field_id=*/1, "value",
+                                std::make_shared<DecimalType>(18, 3)),
+  });
+
+  // Create avro schema with decimal
+  std::string avro_schema_json = R"({
+    "type": "record",
+    "name": "iceberg_schema",
+    "fields": [
+      {
+        "name": "value",
+        "type": {
+          "type": "fixed",
+          "name": "decimal_9_2",
+          "size": 4,
+          "logicalType": "decimal",
+          "precision": 9,
+          "scale": 2
+        },
+        "field-id": 1
+      }
+    ]
+  })";
+  auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+  auto projection_result =
+      Project(expected_schema, avro_schema.root(), /*prune_source=*/false);
+  ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema));
+  ASSERT_THAT(projection_result, HasErrorMessage("Cannot read"));
+}
+
 }  // namespace iceberg::avro


Reply via email to