This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 08da3a9 feat: add avro schema projection (#109)
08da3a9 is described below
commit 08da3a962fc518d9458366bd13e98e9ff1e101e6
Author: Gang Wu <[email protected]>
AuthorDate: Mon May 26 14:08:30 2025 +0800
feat: add avro schema projection (#109)
---
src/iceberg/avro/avro_schema_util.cc | 402 +++++++++++++++++++
src/iceberg/avro/avro_schema_util_internal.h | 14 +
src/iceberg/schema_util.cc | 21 +-
src/iceberg/schema_util_internal.h | 47 +++
test/avro_schema_test.cc | 554 +++++++++++++++++++++++++++
5 files changed, 1018 insertions(+), 20 deletions(-)
diff --git a/src/iceberg/avro/avro_schema_util.cc
b/src/iceberg/avro/avro_schema_util.cc
index 37a34d7..229c62b 100644
--- a/src/iceberg/avro/avro_schema_util.cc
+++ b/src/iceberg/avro/avro_schema_util.cc
@@ -31,6 +31,10 @@
#include <avro/ValidSchema.hh>
#include "iceberg/avro/avro_schema_util_internal.h"
+#include "iceberg/metadata_columns.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_util_internal.h"
+#include "iceberg/util/formatter.h"
#include "iceberg/util/macros.h"
#include "iceberg/util/visit_type.h"
@@ -377,4 +381,402 @@ Status HasIdVisitor::Visit(const ::avro::ValidSchema&
schema) {
Status HasIdVisitor::Visit(const ::avro::Schema& schema) { return
Visit(schema.root()); }
+namespace {
+
+std::string ToString(const ::avro::NodePtr& node) {
+ std::stringstream ss;
+ ss << *node;
+ return ss.str();
+}
+
+std::string ToString(const ::avro::LogicalType& logical_type) {
+ std::stringstream ss;
+ logical_type.printJson(ss);
+ return ss.str();
+}
+
+std::string ToString(const ::avro::LogicalType::Type& logical_type) {
+ return ToString(::avro::LogicalType(logical_type));
+}
+
+bool HasLogicalType(const ::avro::NodePtr& node,
+ ::avro::LogicalType::Type expected_type) {
+ return node->logicalType().type() == expected_type;
+}
+
+bool HasMapLogicalType(const ::avro::NodePtr& node) {
+ return node->logicalType().type() == ::avro::LogicalType::CUSTOM &&
+ node->logicalType().customLogicalType() != nullptr &&
+ node->logicalType().customLogicalType()->name() == "map";
+}
+
+std::optional<std::string> GetAdjustToUtc(const ::avro::NodePtr& node) {
+ if (node->customAttributes() == 0) {
+ return std::nullopt;
+ }
+ return
node->customAttributesAt(0).getAttribute(std::string(kAdjustToUtcProp));
+}
+
+Result<int32_t> GetId(const ::avro::NodePtr& node, const std::string&
attr_name,
+ size_t field_idx) {
+ if (field_idx >= node->customAttributes()) {
+ return InvalidSchema("Field index {} exceeds available custom attributes
{}",
+ field_idx, node->customAttributes());
+ }
+
+ auto id_str = node->customAttributesAt(field_idx).getAttribute(attr_name);
+ if (!id_str.has_value()) {
+ return InvalidSchema("Missing avro attribute: {}", attr_name);
+ }
+
+ try {
+ return std::stoi(id_str.value());
+ } catch (const std::exception& e) {
+ return InvalidSchema("Invalid {}: {}", attr_name, id_str.value());
+ }
+}
+
+Result<int32_t> GetElementId(const ::avro::NodePtr& node) {
+ static const std::string kElementIdKey{kElementIdProp};
+ return GetId(node, kElementIdKey, /*field_idx=*/0);
+}
+
+Result<int32_t> GetKeyId(const ::avro::NodePtr& node) {
+ static const std::string kKeyIdKey{kKeyIdProp};
+ return GetId(node, kKeyIdKey, /*field_idx=*/0);
+}
+
+Result<int32_t> GetValueId(const ::avro::NodePtr& node) {
+ static const std::string kValueIdKey{kValueIdProp};
+ return GetId(node, kValueIdKey, /*field_idx=*/0);
+}
+
+Result<int32_t> GetFieldId(const ::avro::NodePtr& node, size_t field_idx) {
+ static const std::string kFieldIdKey{kFieldIdProp};
+ return GetId(node, kFieldIdKey, field_idx);
+}
+
+Status ValidateAvroSchemaEvolution(const Type& expected_type,
+ const ::avro::NodePtr& avro_node) {
+ switch (expected_type.type_id()) {
+ case TypeId::kBoolean:
+ if (avro_node->type() == ::avro::AVRO_BOOL) {
+ return {};
+ }
+ break;
+ case TypeId::kInt:
+ if (avro_node->type() == ::avro::AVRO_INT) {
+ return {};
+ }
+ break;
+ case TypeId::kLong:
+ if (avro_node->type() == ::avro::AVRO_LONG ||
+ avro_node->type() == ::avro::AVRO_INT) {
+ return {};
+ }
+ break;
+ case TypeId::kFloat:
+ if (avro_node->type() == ::avro::AVRO_FLOAT) {
+ return {};
+ }
+ break;
+ case TypeId::kDouble:
+ if (avro_node->type() == ::avro::AVRO_DOUBLE ||
+ avro_node->type() == ::avro::AVRO_FLOAT) {
+ return {};
+ }
+ break;
+ case TypeId::kDate:
+ if (avro_node->type() == ::avro::AVRO_INT &&
+ HasLogicalType(avro_node, ::avro::LogicalType::DATE)) {
+ return {};
+ }
+ break;
+ case TypeId::kTime:
+ if (avro_node->type() == ::avro::AVRO_LONG &&
+ HasLogicalType(avro_node, ::avro::LogicalType::TIME_MICROS)) {
+ return {};
+ }
+ break;
+ case TypeId::kTimestamp:
+ if (avro_node->type() == ::avro::AVRO_LONG &&
+ HasLogicalType(avro_node, ::avro::LogicalType::TIMESTAMP_MICROS) &&
+ GetAdjustToUtc(avro_node).value_or("false") == "true") {
+ return {};
+ }
+ break;
+ case TypeId::kTimestampTz:
+ if (avro_node->type() == ::avro::AVRO_LONG &&
+ HasLogicalType(avro_node, ::avro::LogicalType::TIMESTAMP_MICROS) &&
+ GetAdjustToUtc(avro_node).value_or("false") == "true") {
+ return {};
+ }
+ break;
+ case TypeId::kString:
+ if (avro_node->type() == ::avro::AVRO_STRING) {
+ return {};
+ }
+ break;
+ case TypeId::kDecimal:
+ if (avro_node->type() == ::avro::AVRO_FIXED &&
+ HasLogicalType(avro_node, ::avro::LogicalType::DECIMAL)) {
+ const auto& decimal_type =
+ internal::checked_cast<const DecimalType&>(expected_type);
+ const auto logical_type = avro_node->logicalType();
+ if (decimal_type.scale() == logical_type.scale() &&
+ decimal_type.precision() >= logical_type.precision()) {
+ return {};
+ }
+ }
+ break;
+ case TypeId::kUuid:
+ if (avro_node->type() == ::avro::AVRO_FIXED && avro_node->fixedSize() ==
16 &&
+ HasLogicalType(avro_node, ::avro::LogicalType::UUID)) {
+ return {};
+ }
+ break;
+ case TypeId::kFixed:
+ if (avro_node->type() == ::avro::AVRO_FIXED &&
+ avro_node->fixedSize() ==
+ internal::checked_cast<const
FixedType&>(expected_type).length()) {
+ return {};
+ }
+ break;
+ case TypeId::kBinary:
+ if (avro_node->type() == ::avro::AVRO_BYTES) {
+ return {};
+ }
+ break;
+ default:
+ break;
+ }
+
+ return InvalidSchema("Cannot read Iceberg type: {} from Avro type: {}",
expected_type,
+ ToString(avro_node));
+}
+
+// XXX: Result<::avro::NodePtr> leads to unresolved external symbol error on
Windows.
+Status UnwrapUnion(const ::avro::NodePtr& node, ::avro::NodePtr* result) {
+ if (node->type() != ::avro::AVRO_UNION) {
+ *result = node;
+ return {};
+ }
+ if (node->leaves() != 2) {
+ return InvalidSchema("Union type must have exactly two branches");
+ }
+ auto branch_0 = node->leafAt(0);
+ auto branch_1 = node->leafAt(1);
+ if (branch_0->type() == ::avro::AVRO_NULL) {
+ *result = branch_1;
+ } else if (branch_1->type() == ::avro::AVRO_NULL) {
+ *result = branch_0;
+ } else {
+ return InvalidSchema("Union type must have exactly one null branch, got
{}",
+ ToString(node));
+ }
+ return {};
+}
+
+// Forward declaration
+Result<FieldProjection> ProjectNested(const Type& expected_type,
+ const ::avro::NodePtr& avro_node,
+ bool prune_source);
+
+Result<FieldProjection> ProjectStruct(const StructType& struct_type,
+ const ::avro::NodePtr& avro_node,
+ bool prune_source) {
+ if (avro_node->type() != ::avro::AVRO_RECORD) {
+ return InvalidSchema("Expected AVRO_RECORD type, but got {}",
ToString(avro_node));
+ }
+
+ const auto& expected_fields = struct_type.fields();
+
+ struct NodeInfo {
+ size_t local_index;
+ ::avro::NodePtr field_node;
+ };
+ std::unordered_map<int32_t, NodeInfo> node_info_map;
+ node_info_map.reserve(avro_node->leaves());
+
+ for (size_t i = 0; i < avro_node->leaves(); ++i) {
+ ICEBERG_ASSIGN_OR_RAISE(int32_t field_id, GetFieldId(avro_node, i));
+ ::avro::NodePtr field_node = avro_node->leafAt(i);
+ if (const auto [iter, inserted] = node_info_map.emplace(
+ std::piecewise_construct, std::forward_as_tuple(field_id),
+ std::forward_as_tuple(i, field_node));
+ !inserted) [[unlikely]] {
+ return InvalidSchema("Duplicate field id found in Avro schema: {}",
field_id);
+ }
+ }
+
+ FieldProjection result;
+ result.children.reserve(expected_fields.size());
+
+ for (const auto& expected_field : expected_fields) {
+ int32_t field_id = expected_field.field_id();
+ FieldProjection child_projection;
+
+ if (auto iter = node_info_map.find(field_id); iter !=
node_info_map.cend()) {
+ ::avro::NodePtr field_node;
+ ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(iter->second.field_node,
&field_node));
+ if (expected_field.type()->is_nested()) {
+ ICEBERG_ASSIGN_OR_RAISE(
+ child_projection,
+ ProjectNested(*expected_field.type(), field_node, prune_source));
+ } else {
+ ICEBERG_RETURN_UNEXPECTED(
+ ValidateAvroSchemaEvolution(*expected_field.type(), field_node));
+ }
+ child_projection.from = iter->second.local_index;
+ child_projection.kind = FieldProjection::Kind::kProjected;
+ } else if (MetadataColumns::IsMetadataColumn(field_id)) {
+ child_projection.kind = FieldProjection::Kind::kMetadata;
+ } else if (expected_field.optional()) {
+ child_projection.kind = FieldProjection::Kind::kNull;
+ } else {
+ return InvalidSchema("Missing required field with ID: {}", field_id);
+ }
+
+ result.children.emplace_back(std::move(child_projection));
+ }
+
+ if (prune_source) {
+ PruneFieldProjection(result);
+ }
+
+ return result;
+}
+
+Result<FieldProjection> ProjectList(const ListType& list_type,
+ const ::avro::NodePtr& avro_node, bool
prune_source) {
+ if (avro_node->type() != ::avro::AVRO_ARRAY) {
+ return InvalidSchema("Expected AVRO_ARRAY type, but got {}",
ToString(avro_node));
+ }
+ if (avro_node->leaves() != 1) {
+ return InvalidSchema("Array type must have exactly one node, got {}",
+ avro_node->leaves());
+ }
+
+ const auto& expected_element_field = list_type.fields().back();
+ ICEBERG_ASSIGN_OR_RAISE(int32_t avro_element_id, GetElementId(avro_node));
+ if (expected_element_field.field_id() != avro_element_id) [[unlikely]] {
+ return InvalidSchema("element-id mismatch, expected {}, got {}",
+ expected_element_field.field_id(), avro_element_id);
+ }
+
+ FieldProjection element_projection;
+ ::avro::NodePtr element_node;
+ ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(avro_node->leafAt(0), &element_node));
+ if (expected_element_field.type()->is_nested()) {
+ ICEBERG_ASSIGN_OR_RAISE(
+ element_projection,
+ ProjectNested(*expected_element_field.type(), element_node,
prune_source));
+ } else {
+ ICEBERG_RETURN_UNEXPECTED(
+ ValidateAvroSchemaEvolution(*expected_element_field.type(),
element_node));
+ }
+
+ FieldProjection result;
+ result.children.emplace_back(std::move(element_projection));
+ return result;
+}
+
+Result<FieldProjection> ProjectMap(const MapType& map_type,
+ const ::avro::NodePtr& avro_node, bool
prune_source) {
+ const auto& expected_key_field = map_type.key();
+ const auto& expected_value_field = map_type.value();
+
+ FieldProjection result, key_projection, value_projection;
+ int32_t avro_key_id, avro_value_id;
+ ::avro::NodePtr map_node;
+
+ if (avro_node->type() == ::avro::AVRO_MAP) {
+ if (avro_node->leaves() != 2) {
+ return InvalidSchema("Map type must have exactly two nodes, got {}",
+ avro_node->leaves());
+ }
+ map_node = avro_node;
+
+ ICEBERG_ASSIGN_OR_RAISE(avro_key_id, GetKeyId(avro_node));
+ ICEBERG_ASSIGN_OR_RAISE(avro_value_id, GetValueId(avro_node));
+ } else if (avro_node->type() == ::avro::AVRO_ARRAY &&
HasMapLogicalType(avro_node)) {
+ if (avro_node->leaves() != 1) {
+ return InvalidSchema("Array-backed map type must have exactly one node,
got {}",
+ avro_node->leaves());
+ }
+
+ map_node = avro_node->leafAt(0);
+ if (map_node->type() != ::avro::AVRO_RECORD || map_node->leaves() != 2) {
+ return InvalidSchema(
+ "Array-backed map type must have a record node with two fields");
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(avro_key_id, GetFieldId(map_node, 0));
+ ICEBERG_ASSIGN_OR_RAISE(avro_value_id, GetFieldId(map_node, 1));
+ } else {
+ return InvalidSchema("Expected a map type, but got Avro type {}",
+ ToString(avro_node));
+ }
+
+ if (expected_key_field.field_id() != avro_key_id) {
+ return InvalidSchema("key-id mismatch, expected {}, got {}",
+ expected_key_field.field_id(), avro_key_id);
+ }
+ if (expected_value_field.field_id() != avro_value_id) {
+ return InvalidSchema("value-id mismatch, expected {}, got {}",
+ expected_value_field.field_id(), avro_value_id);
+ }
+
+ for (size_t i = 0; i < map_node->leaves(); ++i) {
+ FieldProjection sub_projection;
+ ::avro::NodePtr sub_node;
+ ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(map_node->leafAt(i), &sub_node));
+ const auto& expected_sub_field = map_type.fields()[i];
+ if (expected_sub_field.type()->is_nested()) {
+ ICEBERG_ASSIGN_OR_RAISE(sub_projection,
ProjectNested(*expected_sub_field.type(),
+ sub_node,
prune_source));
+ } else {
+ ICEBERG_RETURN_UNEXPECTED(
+ ValidateAvroSchemaEvolution(*expected_sub_field.type(), sub_node));
+ }
+ sub_projection.kind = FieldProjection::Kind::kProjected;
+ sub_projection.from = i;
+ result.children.emplace_back(std::move(sub_projection));
+ }
+
+ return result;
+}
+
+Result<FieldProjection> ProjectNested(const Type& expected_type,
+ const ::avro::NodePtr& avro_node,
+ bool prune_source) {
+ if (!expected_type.is_nested()) {
+ return InvalidSchema("Expected a nested type, but got {}", expected_type);
+ }
+
+ switch (expected_type.type_id()) {
+ case TypeId::kStruct:
+ return ProjectStruct(internal::checked_cast<const
StructType&>(expected_type),
+ avro_node, prune_source);
+ case TypeId::kList:
+ return ProjectList(internal::checked_cast<const
ListType&>(expected_type),
+ avro_node, prune_source);
+ case TypeId::kMap:
+ return ProjectMap(internal::checked_cast<const MapType&>(expected_type),
avro_node,
+ prune_source);
+ default:
+ return InvalidSchema("Unsupported nested type: {}", expected_type);
+ }
+}
+
+} // namespace
+
+Result<SchemaProjection> Project(const Schema& expected_schema,
+ const ::avro::NodePtr& avro_node, bool
prune_source) {
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto field_projection,
+ ProjectNested(static_cast<const Type&>(expected_schema), avro_node,
prune_source));
+ return SchemaProjection{std::move(field_projection.children)};
+}
+
} // namespace iceberg::avro
diff --git a/src/iceberg/avro/avro_schema_util_internal.h
b/src/iceberg/avro/avro_schema_util_internal.h
index d0beb09..50ff9b2 100644
--- a/src/iceberg/avro/avro_schema_util_internal.h
+++ b/src/iceberg/avro/avro_schema_util_internal.h
@@ -24,6 +24,7 @@
#include <avro/Node.hh>
#include "iceberg/result.h"
+#include "iceberg/schema_util.h"
#include "iceberg/type.h"
namespace avro {
@@ -121,4 +122,17 @@ class HasIdVisitor {
size_t fields_with_id_ = 0;
};
+/// \brief Project an Iceberg Schema onto an Avro NodePtr.
+///
+/// This function creates a projection from an Iceberg Schema to an Avro
schema node.
+/// The projection determines how to read data from the Avro schema into the
expected
+/// Iceberg Schema.
+///
+/// \param expected_schema The Iceberg Schema that defines the expected
structure.
+/// \param avro_node The Avro node to read data from.
+/// \param prune_source Whether the source schema can be pruned.
+/// \return The schema projection result.
+Result<SchemaProjection> Project(const Schema& expected_schema,
+ const ::avro::NodePtr& avro_node, bool
prune_source);
+
} // namespace iceberg::avro
diff --git a/src/iceberg/schema_util.cc b/src/iceberg/schema_util.cc
index d8a5dcf..3e409ef 100644
--- a/src/iceberg/schema_util.cc
+++ b/src/iceberg/schema_util.cc
@@ -20,12 +20,12 @@
#include "iceberg/schema_util.h"
#include <format>
-#include <map>
#include <string_view>
#include <unordered_map>
#include "iceberg/metadata_columns.h"
#include "iceberg/schema.h"
+#include "iceberg/schema_util_internal.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/formatter_internal.h"
#include "iceberg/util/macros.h"
@@ -78,25 +78,6 @@ Status ValidateSchemaEvolution(const Type& expected_type,
const Type& source_typ
return NotSupported("Cannot read {} from {}", expected_type, source_type);
}
-// Fix `from` field of `FieldProjection` to use pruned field index.
-void PruneFieldProjection(FieldProjection& field_projection) {
- std::map<size_t, size_t> local_index_to_pruned_index;
- for (const auto& child_projection : field_projection.children) {
- if (child_projection.kind == FieldProjection::Kind::kProjected) {
- local_index_to_pruned_index.emplace(std::get<1>(child_projection.from),
0);
- }
- }
- for (size_t pruned_index = 0; auto& [_, value] :
local_index_to_pruned_index) {
- value = pruned_index++;
- }
- for (auto& child_projection : field_projection.children) {
- if (child_projection.kind == FieldProjection::Kind::kProjected) {
- child_projection.from =
- local_index_to_pruned_index.at(std::get<1>(child_projection.from));
- }
- }
-}
-
Result<FieldProjection> ProjectNested(const Type& expected_type, const Type&
source_type,
bool prune_source) {
if (!expected_type.is_nested()) {
diff --git a/src/iceberg/schema_util_internal.h
b/src/iceberg/schema_util_internal.h
new file mode 100644
index 0000000..33aad93
--- /dev/null
+++ b/src/iceberg/schema_util_internal.h
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <map>
+
+#include "iceberg/schema_util.h"
+
+namespace iceberg {
+
+// Fix `from` field of `FieldProjection` to use pruned field index.
+void PruneFieldProjection(FieldProjection& field_projection) {
+ std::map<size_t, size_t> local_index_to_pruned_index;
+ for (const auto& child_projection : field_projection.children) {
+ if (child_projection.kind == FieldProjection::Kind::kProjected) {
+ local_index_to_pruned_index.emplace(std::get<1>(child_projection.from),
0);
+ }
+ }
+ for (size_t pruned_index = 0; auto& [_, value] :
local_index_to_pruned_index) {
+ value = pruned_index++;
+ }
+ for (auto& child_projection : field_projection.children) {
+ if (child_projection.kind == FieldProjection::Kind::kProjected) {
+ child_projection.from =
+ local_index_to_pruned_index.at(std::get<1>(child_projection.from));
+ }
+ }
+}
+
+} // namespace iceberg
diff --git a/test/avro_schema_test.cc b/test/avro_schema_test.cc
index b24354f..7d18ae8 100644
--- a/test/avro_schema_test.cc
+++ b/test/avro_schema_test.cc
@@ -25,6 +25,8 @@
#include <gtest/gtest.h>
#include "iceberg/avro/avro_schema_util_internal.h"
+#include "iceberg/metadata_columns.h"
+#include "iceberg/schema.h"
#include "matchers.h"
namespace iceberg::avro {
@@ -515,4 +517,556 @@ TEST(HasIdVisitorTest, ArrayBackedMapWithPartialIds) {
EXPECT_FALSE(visitor.AllHaveIds());
}
+TEST(AvroSchemaProjectionTest, ProjectIdenticalSchemas) {
+ // Create an iceberg schema
+ Schema expected_schema({
+ SchemaField::MakeRequired(/*field_id=*/1, "id",
std::make_shared<LongType>()),
+ SchemaField::MakeOptional(/*field_id=*/2, "name",
std::make_shared<StringType>()),
+ SchemaField::MakeOptional(/*field_id=*/3, "age",
std::make_shared<IntType>()),
+ SchemaField::MakeRequired(/*field_id=*/4, "data",
std::make_shared<DoubleType>()),
+ });
+
+ // Create equivalent avro schema
+ std::string avro_schema_json = R"({
+ "type": "record",
+ "name": "iceberg_schema",
+ "fields": [
+ {"name": "id", "type": "long", "field-id": 1},
+ {"name": "name", "type": ["null", "string"], "field-id": 2},
+ {"name": "age", "type": ["null", "int"], "field-id": 3},
+ {"name": "data", "type": "double", "field-id": 4}
+ ]
+ })";
+ auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+ auto projection_result =
+ Project(expected_schema, avro_schema.root(), /*prune_source=*/false);
+ ASSERT_THAT(projection_result, IsOk());
+
+ const auto& projection = *projection_result;
+ ASSERT_EQ(projection.fields.size(), 4);
+ for (size_t i = 0; i < projection.fields.size(); ++i) {
+ ASSERT_EQ(projection.fields[i].kind, FieldProjection::Kind::kProjected);
+ ASSERT_EQ(std::get<1>(projection.fields[i].from), i);
+ }
+}
+
+TEST(AvroSchemaProjectionTest, ProjectSubsetSchema) {
+ // Create a subset iceberg schema
+ Schema expected_schema({
+ SchemaField::MakeRequired(/*field_id=*/1, "id",
std::make_shared<LongType>()),
+ SchemaField::MakeOptional(/*field_id=*/3, "age",
std::make_shared<IntType>()),
+ });
+
+ // Create full avro schema
+ std::string avro_schema_json = R"({
+ "type": "record",
+ "name": "iceberg_schema",
+ "fields": [
+ {"name": "id", "type": "long", "field-id": 1},
+ {"name": "name", "type": ["null", "string"], "field-id": 2},
+ {"name": "age", "type": ["null", "int"], "field-id": 3},
+ {"name": "data", "type": "double", "field-id": 4}
+ ]
+ })";
+ auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+ auto projection_result =
+ Project(expected_schema, avro_schema.root(), /*prune_source=*/false);
+ ASSERT_THAT(projection_result, IsOk());
+
+ const auto& projection = *projection_result;
+ ASSERT_EQ(projection.fields.size(), 2);
+ ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected);
+ ASSERT_EQ(std::get<1>(projection.fields[0].from), 0);
+ ASSERT_EQ(projection.fields[1].kind, FieldProjection::Kind::kProjected);
+ ASSERT_EQ(std::get<1>(projection.fields[1].from), 2);
+}
+
+TEST(AvroSchemaProjectionTest, ProjectWithPruning) {
+ // Create a subset iceberg schema
+ Schema expected_schema({
+ SchemaField::MakeRequired(/*field_id=*/1, "id",
std::make_shared<LongType>()),
+ SchemaField::MakeOptional(/*field_id=*/3, "age",
std::make_shared<IntType>()),
+ });
+
+ // Create full avro schema
+ std::string avro_schema_json = R"({
+ "type": "record",
+ "name": "iceberg_schema",
+ "fields": [
+ {"name": "id", "type": "long", "field-id": 1},
+ {"name": "name", "type": ["null", "string"], "field-id": 2},
+ {"name": "age", "type": ["null", "int"], "field-id": 3},
+ {"name": "data", "type": "double", "field-id": 4}
+ ]
+ })";
+ auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+ auto projection_result =
+ Project(expected_schema, avro_schema.root(), /*prune_source=*/true);
+ ASSERT_THAT(projection_result, IsOk());
+
+ const auto& projection = *projection_result;
+ ASSERT_EQ(projection.fields.size(), 2);
+ ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected);
+ ASSERT_EQ(std::get<1>(projection.fields[0].from), 0);
+ ASSERT_EQ(projection.fields[1].kind, FieldProjection::Kind::kProjected);
+ ASSERT_EQ(std::get<1>(projection.fields[1].from), 1);
+}
+
+TEST(AvroSchemaProjectionTest, ProjectMissingOptionalField) {
+ // Create iceberg schema with an extra optional field
+ Schema expected_schema({
+ SchemaField::MakeRequired(/*field_id=*/1, "id",
std::make_shared<LongType>()),
+ SchemaField::MakeOptional(/*field_id=*/2, "name",
std::make_shared<StringType>()),
+ SchemaField::MakeOptional(/*field_id=*/10, "extra",
std::make_shared<StringType>()),
+ });
+
+ // Create avro schema without the extra field
+ std::string avro_schema_json = R"({
+ "type": "record",
+ "name": "iceberg_schema",
+ "fields": [
+ {"name": "id", "type": "long", "field-id": 1},
+ {"name": "name", "type": ["null", "string"], "field-id": 2}
+ ]
+ })";
+ auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+ auto projection_result =
+ Project(expected_schema, avro_schema.root(), /*prune_source=*/false);
+ ASSERT_THAT(projection_result, IsOk());
+
+ const auto& projection = *projection_result;
+ ASSERT_EQ(projection.fields.size(), 3);
+ ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected);
+ ASSERT_EQ(std::get<1>(projection.fields[0].from), 0);
+ ASSERT_EQ(projection.fields[1].kind, FieldProjection::Kind::kProjected);
+ ASSERT_EQ(std::get<1>(projection.fields[1].from), 1);
+ ASSERT_EQ(projection.fields[2].kind, FieldProjection::Kind::kNull);
+}
+
+TEST(AvroSchemaProjectionTest, ProjectMissingRequiredField) {
+ // Create iceberg schema with a required field that's missing from the avro
schema
+ Schema expected_schema({
+ SchemaField::MakeRequired(/*field_id=*/1, "id",
std::make_shared<LongType>()),
+ SchemaField::MakeOptional(/*field_id=*/2, "name",
std::make_shared<StringType>()),
+ SchemaField::MakeRequired(/*field_id=*/10, "extra",
std::make_shared<StringType>()),
+ });
+
+ std::string avro_schema_json = R"({
+ "type": "record",
+ "name": "iceberg_schema",
+ "fields": [
+ {"name": "id", "type": "long", "field-id": 1},
+ {"name": "name", "type": ["null", "string"], "field-id": 2}
+ ]
+ })";
+ auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+ auto projection_result =
+ Project(expected_schema, avro_schema.root(), /*prune_source=*/false);
+ ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema));
+ ASSERT_THAT(projection_result, HasErrorMessage("Missing required field"));
+}
+
+TEST(AvroSchemaProjectionTest, ProjectMetadataColumn) {
+ // Create iceberg schema with a metadata column
+ Schema expected_schema({
+ SchemaField::MakeRequired(/*field_id=*/1, "id",
std::make_shared<LongType>()),
+ MetadataColumns::kFilePath,
+ });
+
+ std::string avro_schema_json = R"({
+ "type": "record",
+ "name": "iceberg_schema",
+ "fields": [
+ {"name": "id", "type": "long", "field-id": 1}
+ ]
+ })";
+ auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+ auto projection_result =
+ Project(expected_schema, avro_schema.root(), /*prune_source=*/false);
+ ASSERT_THAT(projection_result, IsOk());
+
+ const auto& projection = *projection_result;
+ ASSERT_EQ(projection.fields.size(), 2);
+ ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected);
+ ASSERT_EQ(std::get<1>(projection.fields[0].from), 0);
+ ASSERT_EQ(projection.fields[1].kind, FieldProjection::Kind::kMetadata);
+}
+
+TEST(AvroSchemaProjectionTest, ProjectSchemaEvolutionIntToLong) {
+ // Create iceberg schema expecting a long
+ Schema expected_schema({
+ SchemaField::MakeRequired(/*field_id=*/1, "id",
std::make_shared<LongType>()),
+ });
+
+ // Create avro schema with an int
+ std::string avro_schema_json = R"({
+ "type": "record",
+ "name": "iceberg_schema",
+ "fields": [
+ {"name": "id", "type": "int", "field-id": 1}
+ ]
+ })";
+ auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+ auto projection_result =
+ Project(expected_schema, avro_schema.root(), /*prune_source=*/false);
+ ASSERT_THAT(projection_result, IsOk());
+
+ const auto& projection = *projection_result;
+ ASSERT_EQ(projection.fields.size(), 1);
+ ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected);
+ ASSERT_EQ(std::get<1>(projection.fields[0].from), 0);
+}
+
+TEST(AvroSchemaProjectionTest, ProjectSchemaEvolutionFloatToDouble) {
+ // Create iceberg schema expecting a double
+ Schema expected_schema({
+ SchemaField::MakeRequired(/*field_id=*/1, "value",
std::make_shared<DoubleType>()),
+ });
+
+ // Create avro schema with a float
+ std::string avro_schema_json = R"({
+ "type": "record",
+ "name": "iceberg_schema",
+ "fields": [
+ {"name": "value", "type": "float", "field-id": 1}
+ ]
+ })";
+ auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+ auto projection_result =
+ Project(expected_schema, avro_schema.root(), /*prune_source=*/false);
+ ASSERT_THAT(projection_result, IsOk());
+
+ const auto& projection = *projection_result;
+ ASSERT_EQ(projection.fields.size(), 1);
+ ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected);
+ ASSERT_EQ(std::get<1>(projection.fields[0].from), 0);
+}
+
+TEST(AvroSchemaProjectionTest, ProjectSchemaEvolutionIncompatibleTypes) {
+ // Create iceberg schema expecting an int
+ Schema expected_schema({
+ SchemaField::MakeRequired(/*field_id=*/1, "value",
std::make_shared<IntType>()),
+ });
+
+ // Create avro schema with a string
+ std::string avro_schema_json = R"({
+ "type": "record",
+ "name": "iceberg_schema",
+ "fields": [
+ {"name": "value", "type": "string", "field-id": 1}
+ ]
+ })";
+ auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+ auto projection_result =
+ Project(expected_schema, avro_schema.root(), /*prune_source=*/false);
+ ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema));
+ ASSERT_THAT(projection_result, HasErrorMessage("Cannot read"));
+}
+
+TEST(AvroSchemaProjectionTest, ProjectNestedStructures) {
+ // Create iceberg schema with nested struct
+ Schema expected_schema({
+ SchemaField::MakeRequired(/*field_id=*/1, "id",
std::make_shared<LongType>()),
+ SchemaField::MakeOptional(
+ /*field_id=*/3, "address",
+ std::make_shared<StructType>(std::vector<SchemaField>{
+ SchemaField::MakeOptional(/*field_id=*/101, "street",
+ std::make_shared<StringType>()),
+ SchemaField::MakeOptional(/*field_id=*/102, "city",
+ std::make_shared<StringType>()),
+ })),
+ });
+
+ // Create equivalent avro schema
+ std::string avro_schema_json = R"({
+ "type": "record",
+ "name": "iceberg_schema",
+ "fields": [
+ {"name": "id", "type": "long", "field-id": 1},
+ {"name": "address", "type": ["null", {
+ "type": "record",
+ "name": "address_record",
+ "fields": [
+ {"name": "street", "type": ["null", "string"], "field-id": 101},
+ {"name": "city", "type": ["null", "string"], "field-id": 102}
+ ]
+ }], "field-id": 3}
+ ]
+ })";
+ auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+ auto projection_result =
+ Project(expected_schema, avro_schema.root(), /*prune_source=*/true);
+ ASSERT_THAT(projection_result, IsOk());
+
+ const auto& projection = *projection_result;
+ ASSERT_EQ(projection.fields.size(), 2);
+ ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected);
+ ASSERT_EQ(std::get<1>(projection.fields[0].from), 0);
+ ASSERT_EQ(projection.fields[1].kind, FieldProjection::Kind::kProjected);
+ ASSERT_EQ(std::get<1>(projection.fields[1].from), 1);
+
+ // Verify struct field has children correctly mapped
+ ASSERT_EQ(projection.fields[1].children.size(), 2);
+ ASSERT_EQ(projection.fields[1].children[0].kind,
FieldProjection::Kind::kProjected);
+ ASSERT_EQ(std::get<1>(projection.fields[1].children[0].from), 0);
+ ASSERT_EQ(projection.fields[1].children[1].kind,
FieldProjection::Kind::kProjected);
+ ASSERT_EQ(std::get<1>(projection.fields[1].children[1].from), 1);
+}
+
+TEST(AvroSchemaProjectionTest, ProjectListType) {
+ // Create iceberg schema with a list
+ Schema expected_schema({
+ SchemaField::MakeRequired(/*field_id=*/1, "id",
std::make_shared<LongType>()),
+ SchemaField::MakeOptional(
+ /*field_id=*/2, "numbers",
+ std::make_shared<ListType>(SchemaField::MakeOptional(
+ /*field_id=*/101, "element", std::make_shared<IntType>()))),
+ });
+
+ // Create equivalent avro schema
+ std::string avro_schema_json = R"({
+ "type": "record",
+ "name": "iceberg_schema",
+ "fields": [
+ {"name": "id", "type": "long", "field-id": 1},
+ {"name": "numbers", "type": ["null", {
+ "type": "array",
+ "items": ["null", "int"],
+ "element-id": 101
+ }], "field-id": 2}
+ ]
+ })";
+ auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+ auto projection_result =
+ Project(expected_schema, avro_schema.root(), /*prune_source=*/false);
+ ASSERT_THAT(projection_result, IsOk());
+
+ const auto& projection = *projection_result;
+ ASSERT_EQ(projection.fields.size(), 2);
+ ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected);
+ ASSERT_EQ(std::get<1>(projection.fields[0].from), 0);
+ ASSERT_EQ(projection.fields[1].kind, FieldProjection::Kind::kProjected);
+ ASSERT_EQ(std::get<1>(projection.fields[1].from), 1);
+}
+
+TEST(AvroSchemaProjectionTest, ProjectMapType) {
+ // Create iceberg schema with a string->int map
+ Schema expected_schema({
+ SchemaField::MakeOptional(
+ /*field_id=*/1, "counts",
+ std::make_shared<MapType>(
+ SchemaField::MakeRequired(/*field_id=*/101, "key",
+ std::make_shared<StringType>()),
+ SchemaField::MakeOptional(/*field_id=*/102, "value",
+ std::make_shared<IntType>()))),
+ });
+
+ // Create equivalent avro schema
+ std::string avro_schema_json = R"({
+ "type": "record",
+ "name": "iceberg_schema",
+ "fields": [
+ {"name": "counts", "type": ["null", {
+ "type": "map",
+ "values": ["null", "int"],
+ "key-id": 101,
+ "value-id": 102
+ }], "field-id": 1}
+ ]
+ })";
+ auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+ auto projection_result =
+ Project(expected_schema, avro_schema.root(), /*prune_source=*/false);
+ ASSERT_THAT(projection_result, IsOk());
+
+ const auto& projection = *projection_result;
+ ASSERT_EQ(projection.fields.size(), 1);
+ ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected);
+ ASSERT_EQ(std::get<1>(projection.fields[0].from), 0);
+ ASSERT_EQ(projection.fields[0].children.size(), 2);
+}
+
+TEST(AvroSchemaProjectionTest, ProjectMapTypeWithNonStringKey) {
+ // Create iceberg schema with an int->string map
+ Schema expected_schema({
+ SchemaField::MakeOptional(
+ /*field_id=*/1, "counts",
+ std::make_shared<MapType>(
+ SchemaField::MakeRequired(/*field_id=*/101, "key",
+ std::make_shared<IntType>()),
+ SchemaField::MakeOptional(/*field_id=*/102, "value",
+ std::make_shared<StringType>()))),
+ });
+
+ // Create equivalent avro schema (using array-backed map for non-string keys)
+ std::string avro_schema_json = R"({
+ "type": "record",
+ "name": "iceberg_schema",
+ "fields": [
+ {"name": "counts", "type": ["null", {
+ "type": "array",
+ "items": {
+ "type": "record",
+ "name": "key_value",
+ "fields": [
+ {"name": "key", "type": "int", "field-id": 101},
+ {"name": "value", "type": ["null", "string"], "field-id": 102}
+ ]
+ },
+ "logicalType": "map"
+ }], "field-id": 1}
+ ]
+ })";
+ auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+ auto projection_result =
+ Project(expected_schema, avro_schema.root(), /*prune_source=*/false);
+ ASSERT_THAT(projection_result, IsOk());
+
+ const auto& projection = *projection_result;
+ ASSERT_EQ(projection.fields.size(), 1);
+ ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected);
+ ASSERT_EQ(std::get<1>(projection.fields[0].from), 0);
+ ASSERT_EQ(projection.fields[0].children.size(), 2);
+}
+
+TEST(AvroSchemaProjectionTest, ProjectListOfStruct) {
+ // Create iceberg schema with list of struct
+ Schema expected_schema({
+ SchemaField::MakeOptional(
+ /*field_id=*/1, "items",
+ std::make_shared<ListType>(SchemaField::MakeOptional(
+ /*field_id=*/101, "element",
+ std::make_shared<StructType>(std::vector<SchemaField>{
+ SchemaField::MakeOptional(/*field_id=*/102, "x",
+ std::make_shared<IntType>()),
+ SchemaField::MakeRequired(/*field_id=*/103, "y",
+ std::make_shared<StringType>()),
+ })))),
+ });
+
+ // Create equivalent avro schema
+ std::string avro_schema_json = R"({
+ "type": "record",
+ "name": "iceberg_schema",
+ "fields": [
+ {"name": "items", "type": ["null", {
+ "type": "array",
+ "items": ["null", {
+ "type": "record",
+ "name": "element_record",
+ "fields": [
+ {"name": "x", "type": ["null", "int"], "field-id": 102},
+ {"name": "y", "type": "string", "field-id": 103}
+ ]
+ }],
+ "element-id": 101
+ }], "field-id": 1}
+ ]
+ })";
+ auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+ auto projection_result =
+ Project(expected_schema, avro_schema.root(), /*prune_source=*/false);
+ ASSERT_THAT(projection_result, IsOk());
+
+ const auto& projection = *projection_result;
+ ASSERT_EQ(projection.fields.size(), 1);
+ ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected);
+ ASSERT_EQ(std::get<1>(projection.fields[0].from), 0);
+
+ // Verify list element struct is properly projected
+ ASSERT_EQ(projection.fields[0].children.size(), 1);
+ const auto& element_proj = projection.fields[0].children[0];
+ ASSERT_EQ(element_proj.children.size(), 2);
+ ASSERT_EQ(element_proj.children[0].kind, FieldProjection::Kind::kProjected);
+ ASSERT_EQ(std::get<1>(element_proj.children[0].from), 0);
+ ASSERT_EQ(element_proj.children[1].kind, FieldProjection::Kind::kProjected);
+ ASSERT_EQ(std::get<1>(element_proj.children[1].from), 1);
+}
+
+TEST(AvroSchemaProjectionTest, ProjectDecimalType) {
+ // Create iceberg schema with decimal
+ Schema expected_schema({
+ SchemaField::MakeRequired(/*field_id=*/1, "value",
+ std::make_shared<DecimalType>(18, 2)),
+ });
+
+ // Create avro schema with decimal
+ std::string avro_schema_json = R"({
+ "type": "record",
+ "name": "iceberg_schema",
+ "fields": [
+ {
+ "name": "value",
+ "type": {
+ "type": "fixed",
+ "name": "decimal_9_2",
+ "size": 4,
+ "logicalType": "decimal",
+ "precision": 9,
+ "scale": 2
+ },
+ "field-id": 1
+ }
+ ]
+ })";
+ auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+ auto projection_result =
+ Project(expected_schema, avro_schema.root(), /*prune_source=*/false);
+ ASSERT_THAT(projection_result, IsOk());
+
+ const auto& projection = *projection_result;
+ ASSERT_EQ(projection.fields.size(), 1);
+ ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected);
+ ASSERT_EQ(std::get<1>(projection.fields[0].from), 0);
+}
+
+TEST(AvroSchemaProjectionTest, ProjectDecimalIncompatible) {
+ // Create iceberg schema with decimal having different scale
+ Schema expected_schema({
+ SchemaField::MakeRequired(/*field_id=*/1, "value",
+ std::make_shared<DecimalType>(18, 3)),
+ });
+
+ // Create avro schema with decimal
+ std::string avro_schema_json = R"({
+ "type": "record",
+ "name": "iceberg_schema",
+ "fields": [
+ {
+ "name": "value",
+ "type": {
+ "type": "fixed",
+ "name": "decimal_9_2",
+ "size": 4,
+ "logicalType": "decimal",
+ "precision": 9,
+ "scale": 2
+ },
+ "field-id": 1
+ }
+ ]
+ })";
+ auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json);
+
+ auto projection_result =
+ Project(expected_schema, avro_schema.root(), /*prune_source=*/false);
+ ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema));
+ ASSERT_THAT(projection_result, HasErrorMessage("Cannot read"));
+}
+
} // namespace iceberg::avro