mapleFU commented on code in PR #159: URL: https://github.com/apache/iceberg-cpp/pull/159#discussion_r2254084748
########## src/iceberg/parquet/parquet_schema_util.cc: ########## @@ -17,20 +17,392 @@ * under the License. */ +#include <arrow/type.h> +#include <arrow/type_fwd.h> +#include <arrow/util/key_value_metadata.h> +#include <parquet/arrow/schema.h> #include <parquet/schema.h> +#include "iceberg/metadata_columns.h" #include "iceberg/parquet/parquet_schema_util_internal.h" +#include "iceberg/result.h" +#include "iceberg/schema_util_internal.h" #include "iceberg/util/checked_cast.h" +#include "iceberg/util/formatter.h" +#include "iceberg/util/macros.h" namespace iceberg::parquet { +namespace { + +constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id"; + +std::optional<int32_t> FieldIdFromMetadata( + const std::shared_ptr<const ::arrow::KeyValueMetadata>& metadata) { + if (!metadata) { + return std::nullopt; + } + int key = metadata->FindKey(kParquetFieldIdKey.data()); + if (key < 0) { + return std::nullopt; + } + std::string field_id_str = metadata->value(key); + int32_t field_id = -1; + try { + field_id = std::stoi(field_id_str); + } catch (const std::invalid_argument& e) { + return std::nullopt; + } catch (const std::out_of_range& e) { + return std::nullopt; + } + return field_id < 0 ? std::nullopt : std::make_optional(field_id); +} + +std::optional<int32_t> GetFieldId(const ::parquet::arrow::SchemaField& parquet_field) { + return FieldIdFromMetadata(parquet_field.field->metadata()); +} + +Status ValidateParquetSchemaEvolution( + const Type& expected_type, const ::parquet::arrow::SchemaField& parquet_field) { + const auto& arrow_type = parquet_field.field->type(); + switch (expected_type.type_id()) { + case TypeId::kBoolean: + if (arrow_type->id() == ::arrow::Type::BOOL) { + return {}; + } + break; + case TypeId::kInt: + if (arrow_type->id() == ::arrow::Type::INT32) { + return {}; + } + break; + case TypeId::kLong: + if (arrow_type->id() == ::arrow::Type::INT64 || + arrow_type->id() == ::arrow::Type::INT32) { + return {}; + } + break; + case TypeId::kFloat: + if (arrow_type->id() == ::arrow::Type::FLOAT) { + return {}; + } + break; + case TypeId::kDouble: + if (arrow_type->id() == ::arrow::Type::DOUBLE || + arrow_type->id() == ::arrow::Type::FLOAT) { + return {}; + } + break; + case TypeId::kDate: + if (arrow_type->id() == ::arrow::Type::DATE32) { + return {}; + } + break; + case TypeId::kTime: + if (arrow_type->id() == ::arrow::Type::TIME64) { + return {}; + } + break; + case TypeId::kTimestamp: + if (arrow_type->id() == ::arrow::Type::TIMESTAMP) { + const auto& timestamp_type = + internal::checked_cast<const ::arrow::TimestampType&>(*arrow_type); + if (timestamp_type.unit() == ::arrow::TimeUnit::MICRO && + timestamp_type.timezone().empty()) { + return {}; + } + } + break; + case TypeId::kTimestampTz: + if (arrow_type->id() == ::arrow::Type::TIMESTAMP) { + const auto& timestamp_type = + internal::checked_cast<const ::arrow::TimestampType&>(*arrow_type); + if (timestamp_type.unit() == ::arrow::TimeUnit::MICRO && + !timestamp_type.timezone().empty()) { + return {}; + } + } + break; + case TypeId::kString: + if (arrow_type->id() == ::arrow::Type::STRING) { + return {}; + } + break; + case TypeId::kBinary: + if (arrow_type->id() == ::arrow::Type::BINARY) { + return {}; + } + break; + case TypeId::kDecimal: + if (arrow_type->id() == ::arrow::Type::DECIMAL128) { + const auto& decimal_type = + internal::checked_cast<const DecimalType&>(expected_type); + const auto& arrow_decimal = + internal::checked_cast<const ::arrow::Decimal128Type&>(*arrow_type); + if (decimal_type.scale() == arrow_decimal.scale() && + decimal_type.precision() >= arrow_decimal.precision()) { + return {}; + } + } + break; + case TypeId::kUuid: + if (arrow_type->id() == ::arrow::Type::FIXED_SIZE_BINARY) { + const auto& fixed_binary = + internal::checked_cast<const ::arrow::FixedSizeBinaryType&>(*arrow_type); + if (fixed_binary.byte_width() == 16) { + return {}; + } + } + break; + case TypeId::kFixed: + if (arrow_type->id() == ::arrow::Type::FIXED_SIZE_BINARY) { + const auto& fixed_binary = + internal::checked_cast<const ::arrow::FixedSizeBinaryType&>(*arrow_type); + if (fixed_binary.byte_width() == + internal::checked_cast<const FixedType&>(expected_type).length()) { + return {}; + } + } + break; + case TypeId::kStruct: + if (arrow_type->id() == ::arrow::Type::STRUCT) { + return {}; + } + break; + case TypeId::kList: + if (arrow_type->id() == ::arrow::Type::LIST) { + return {}; + } + break; + case TypeId::kMap: + if (arrow_type->id() == ::arrow::Type::MAP) { + return {}; + } + break; + default: + break; + } + + return InvalidSchema("Cannot read Iceberg type: {} from Parquet type: {}", + expected_type, arrow_type->ToString()); +} + +// Forward declaration +Result<FieldProjection> ProjectNested( + const Type& expected_type, + const std::vector<::parquet::arrow::SchemaField>& parquet_fields); + +Result<FieldProjection> ProjectStruct( + const StructType& struct_type, + const std::vector<::parquet::arrow::SchemaField>& parquet_fields) { + struct FieldContext { + size_t local_index; + const ::parquet::arrow::SchemaField& parquet_field; + }; + std::unordered_map<int32_t, FieldContext> field_context_map; + field_context_map.reserve(parquet_fields.size()); + + for (size_t i = 0; i < parquet_fields.size(); ++i) { + const ::parquet::arrow::SchemaField& parquet_field = parquet_fields[i]; + auto field_id = GetFieldId(parquet_field); + if (!field_id) { + continue; + } + if (const auto [iter, inserted] = field_context_map.emplace( + std::piecewise_construct, std::forward_as_tuple(field_id.value()), + std::forward_as_tuple(i, parquet_field)); + !inserted) [[unlikely]] { + return InvalidSchema("Duplicate field id found in Parquet schema: {}", Review Comment: ```suggestion return InvalidSchema("Duplicate field id '{}' found in Parquet schema", ``` And add schema as string? ########## src/iceberg/parquet/parquet_schema_util.cc: ########## @@ -17,20 +17,392 @@ * under the License. */ +#include <arrow/type.h> +#include <arrow/type_fwd.h> +#include <arrow/util/key_value_metadata.h> +#include <parquet/arrow/schema.h> #include <parquet/schema.h> +#include "iceberg/metadata_columns.h" #include "iceberg/parquet/parquet_schema_util_internal.h" +#include "iceberg/result.h" +#include "iceberg/schema_util_internal.h" #include "iceberg/util/checked_cast.h" +#include "iceberg/util/formatter.h" +#include "iceberg/util/macros.h" namespace iceberg::parquet { +namespace { + +constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id"; + +std::optional<int32_t> FieldIdFromMetadata( + const std::shared_ptr<const ::arrow::KeyValueMetadata>& metadata) { + if (!metadata) { + return std::nullopt; + } + int key = metadata->FindKey(kParquetFieldIdKey.data()); + if (key < 0) { + return std::nullopt; + } + std::string field_id_str = metadata->value(key); + int32_t field_id = -1; + try { + field_id = std::stoi(field_id_str); + } catch (const std::invalid_argument& e) { + return std::nullopt; + } catch (const std::out_of_range& e) { + return std::nullopt; + } + return field_id < 0 ? std::nullopt : std::make_optional(field_id); +} + +std::optional<int32_t> GetFieldId(const ::parquet::arrow::SchemaField& parquet_field) { + return FieldIdFromMetadata(parquet_field.field->metadata()); +} + +Status ValidateParquetSchemaEvolution( + const Type& expected_type, const ::parquet::arrow::SchemaField& parquet_field) { + const auto& arrow_type = parquet_field.field->type(); + switch (expected_type.type_id()) { + case TypeId::kBoolean: + if (arrow_type->id() == ::arrow::Type::BOOL) { + return {}; + } + break; + case TypeId::kInt: + if (arrow_type->id() == ::arrow::Type::INT32) { + return {}; + } + break; + case TypeId::kLong: + if (arrow_type->id() == ::arrow::Type::INT64 || + arrow_type->id() == ::arrow::Type::INT32) { + return {}; + } + break; + case TypeId::kFloat: + if (arrow_type->id() == ::arrow::Type::FLOAT) { + return {}; + } + break; + case TypeId::kDouble: + if (arrow_type->id() == ::arrow::Type::DOUBLE || + arrow_type->id() == ::arrow::Type::FLOAT) { + return {}; + } + break; + case TypeId::kDate: + if (arrow_type->id() == ::arrow::Type::DATE32) { + return {}; + } + break; + case TypeId::kTime: + if (arrow_type->id() == ::arrow::Type::TIME64) { + return {}; + } + break; + case TypeId::kTimestamp: + if (arrow_type->id() == ::arrow::Type::TIMESTAMP) { + const auto& timestamp_type = + internal::checked_cast<const ::arrow::TimestampType&>(*arrow_type); + if (timestamp_type.unit() == ::arrow::TimeUnit::MICRO && + timestamp_type.timezone().empty()) { + return {}; + } + } + break; + case TypeId::kTimestampTz: + if (arrow_type->id() == ::arrow::Type::TIMESTAMP) { + const auto& timestamp_type = + internal::checked_cast<const ::arrow::TimestampType&>(*arrow_type); + if (timestamp_type.unit() == ::arrow::TimeUnit::MICRO && + !timestamp_type.timezone().empty()) { + return {}; + } + } + break; + case TypeId::kString: + if (arrow_type->id() == ::arrow::Type::STRING) { + return {}; + } + break; + case TypeId::kBinary: + if (arrow_type->id() == ::arrow::Type::BINARY) { + return {}; + } + break; + case TypeId::kDecimal: + if (arrow_type->id() == ::arrow::Type::DECIMAL128) { + const auto& decimal_type = + internal::checked_cast<const DecimalType&>(expected_type); + const auto& arrow_decimal = + internal::checked_cast<const ::arrow::Decimal128Type&>(*arrow_type); + if (decimal_type.scale() == arrow_decimal.scale() && + decimal_type.precision() >= arrow_decimal.precision()) { + return {}; + } + } + break; + case TypeId::kUuid: + if (arrow_type->id() == ::arrow::Type::FIXED_SIZE_BINARY) { + const auto& fixed_binary = + internal::checked_cast<const ::arrow::FixedSizeBinaryType&>(*arrow_type); + if (fixed_binary.byte_width() == 16) { + return {}; + } + } + break; + case TypeId::kFixed: + if (arrow_type->id() == ::arrow::Type::FIXED_SIZE_BINARY) { + const auto& fixed_binary = + internal::checked_cast<const ::arrow::FixedSizeBinaryType&>(*arrow_type); + if (fixed_binary.byte_width() == + internal::checked_cast<const FixedType&>(expected_type).length()) { + return {}; + } + } + break; + case TypeId::kStruct: + if (arrow_type->id() == ::arrow::Type::STRUCT) { + return {}; + } + break; + case TypeId::kList: + if (arrow_type->id() == ::arrow::Type::LIST) { + return {}; + } + break; + case TypeId::kMap: + if (arrow_type->id() == ::arrow::Type::MAP) { + return {}; + } + break; + default: + break; + } + + return InvalidSchema("Cannot read Iceberg type: {} from Parquet type: {}", + expected_type, arrow_type->ToString()); +} + +// Forward declaration +Result<FieldProjection> ProjectNested( + const Type& expected_type, + const std::vector<::parquet::arrow::SchemaField>& parquet_fields); + +Result<FieldProjection> ProjectStruct( + const StructType& struct_type, + const std::vector<::parquet::arrow::SchemaField>& parquet_fields) { + struct FieldContext { + size_t local_index; + const ::parquet::arrow::SchemaField& parquet_field; + }; + std::unordered_map<int32_t, FieldContext> field_context_map; + field_context_map.reserve(parquet_fields.size()); + + for (size_t i = 0; i < parquet_fields.size(); ++i) { + const ::parquet::arrow::SchemaField& parquet_field = parquet_fields[i]; + auto field_id = GetFieldId(parquet_field); + if (!field_id) { + continue; + } + if (const auto [iter, inserted] = field_context_map.emplace( + std::piecewise_construct, std::forward_as_tuple(field_id.value()), + std::forward_as_tuple(i, parquet_field)); + !inserted) [[unlikely]] { + return InvalidSchema("Duplicate field id found in Parquet schema: {}", + field_id.value()); + } + } + + FieldProjection result; + result.children.reserve(struct_type.fields().size()); + + for (const auto& expected_field : struct_type.fields()) { + int32_t field_id = expected_field.field_id(); + FieldProjection child_projection; + + if (auto iter = field_context_map.find(field_id); iter != field_context_map.cend()) { + const auto& parquet_field = iter->second.parquet_field; + ICEBERG_RETURN_UNEXPECTED( + ValidateParquetSchemaEvolution(*expected_field.type(), parquet_field)); + if (expected_field.type()->is_nested()) { + ICEBERG_ASSIGN_OR_RAISE(child_projection, ProjectNested(*expected_field.type(), + parquet_field.children)); + } else { + child_projection.attributes = + std::make_shared<ParquetExtraAttributes>(parquet_field.column_index); + } + child_projection.from = iter->second.local_index; + child_projection.kind = FieldProjection::Kind::kProjected; + } else if (MetadataColumns::IsMetadataColumn(field_id)) { + child_projection.kind = FieldProjection::Kind::kMetadata; + } else if (expected_field.optional()) { + child_projection.kind = FieldProjection::Kind::kNull; + } else { + return InvalidSchema("Missing required field with id: {}", field_id); + } + + result.children.emplace_back(std::move(child_projection)); + } + + PruneFieldProjection(result); + return result; +} + +Result<FieldProjection> ProjectList( + const ListType& list_type, + const std::vector<::parquet::arrow::SchemaField>& parquet_fields) { + if (parquet_fields.size() != 1) { + return InvalidSchema("List type must have exactly one field, got {}", + parquet_fields.size()); + } + + const auto& parquet_field = parquet_fields[0]; + auto element_field_id = GetFieldId(parquet_field); + if (!element_field_id) { + return InvalidSchema("List element field missing field id"); + } + + const auto& expected_element_field = list_type.fields().back(); Review Comment: why this variable is named as "expected_element_field" rather than "element_field"? ########## src/iceberg/parquet/parquet_schema_util.cc: ########## @@ -17,20 +17,392 @@ * under the License. */ +#include <arrow/type.h> +#include <arrow/type_fwd.h> +#include <arrow/util/key_value_metadata.h> +#include <parquet/arrow/schema.h> #include <parquet/schema.h> +#include "iceberg/metadata_columns.h" #include "iceberg/parquet/parquet_schema_util_internal.h" +#include "iceberg/result.h" +#include "iceberg/schema_util_internal.h" #include "iceberg/util/checked_cast.h" +#include "iceberg/util/formatter.h" +#include "iceberg/util/macros.h" namespace iceberg::parquet { +namespace { + +constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id"; + +std::optional<int32_t> FieldIdFromMetadata( + const std::shared_ptr<const ::arrow::KeyValueMetadata>& metadata) { + if (!metadata) { + return std::nullopt; + } + int key = metadata->FindKey(kParquetFieldIdKey.data()); + if (key < 0) { + return std::nullopt; + } + std::string field_id_str = metadata->value(key); + int32_t field_id = -1; + try { + field_id = std::stoi(field_id_str); + } catch (const std::invalid_argument& e) { + return std::nullopt; + } catch (const std::out_of_range& e) { + return std::nullopt; + } + return field_id < 0 ? std::nullopt : std::make_optional(field_id); +} + +std::optional<int32_t> GetFieldId(const ::parquet::arrow::SchemaField& parquet_field) { + return FieldIdFromMetadata(parquet_field.field->metadata()); +} + +Status ValidateParquetSchemaEvolution( + const Type& expected_type, const ::parquet::arrow::SchemaField& parquet_field) { + const auto& arrow_type = parquet_field.field->type(); + switch (expected_type.type_id()) { + case TypeId::kBoolean: + if (arrow_type->id() == ::arrow::Type::BOOL) { + return {}; + } + break; + case TypeId::kInt: + if (arrow_type->id() == ::arrow::Type::INT32) { + return {}; + } + break; + case TypeId::kLong: + if (arrow_type->id() == ::arrow::Type::INT64 || + arrow_type->id() == ::arrow::Type::INT32) { + return {}; + } + break; + case TypeId::kFloat: + if (arrow_type->id() == ::arrow::Type::FLOAT) { + return {}; + } + break; + case TypeId::kDouble: + if (arrow_type->id() == ::arrow::Type::DOUBLE || + arrow_type->id() == ::arrow::Type::FLOAT) { + return {}; + } + break; + case TypeId::kDate: + if (arrow_type->id() == ::arrow::Type::DATE32) { + return {}; + } + break; + case TypeId::kTime: + if (arrow_type->id() == ::arrow::Type::TIME64) { + return {}; + } + break; + case TypeId::kTimestamp: + if (arrow_type->id() == ::arrow::Type::TIMESTAMP) { + const auto& timestamp_type = + internal::checked_cast<const ::arrow::TimestampType&>(*arrow_type); + if (timestamp_type.unit() == ::arrow::TimeUnit::MICRO && + timestamp_type.timezone().empty()) { + return {}; + } + } + break; + case TypeId::kTimestampTz: + if (arrow_type->id() == ::arrow::Type::TIMESTAMP) { + const auto& timestamp_type = + internal::checked_cast<const ::arrow::TimestampType&>(*arrow_type); + if (timestamp_type.unit() == ::arrow::TimeUnit::MICRO && + !timestamp_type.timezone().empty()) { + return {}; + } + } + break; + case TypeId::kString: + if (arrow_type->id() == ::arrow::Type::STRING) { + return {}; + } + break; + case TypeId::kBinary: + if (arrow_type->id() == ::arrow::Type::BINARY) { + return {}; + } + break; + case TypeId::kDecimal: + if (arrow_type->id() == ::arrow::Type::DECIMAL128) { + const auto& decimal_type = + internal::checked_cast<const DecimalType&>(expected_type); + const auto& arrow_decimal = + internal::checked_cast<const ::arrow::Decimal128Type&>(*arrow_type); + if (decimal_type.scale() == arrow_decimal.scale() && + decimal_type.precision() >= arrow_decimal.precision()) { + return {}; + } + } + break; + case TypeId::kUuid: + if (arrow_type->id() == ::arrow::Type::FIXED_SIZE_BINARY) { + const auto& fixed_binary = + internal::checked_cast<const ::arrow::FixedSizeBinaryType&>(*arrow_type); + if (fixed_binary.byte_width() == 16) { + return {}; + } + } + break; + case TypeId::kFixed: + if (arrow_type->id() == ::arrow::Type::FIXED_SIZE_BINARY) { + const auto& fixed_binary = + internal::checked_cast<const ::arrow::FixedSizeBinaryType&>(*arrow_type); + if (fixed_binary.byte_width() == + internal::checked_cast<const FixedType&>(expected_type).length()) { + return {}; + } + } + break; + case TypeId::kStruct: + if (arrow_type->id() == ::arrow::Type::STRUCT) { + return {}; + } + break; + case TypeId::kList: + if (arrow_type->id() == ::arrow::Type::LIST) { + return {}; + } + break; + case TypeId::kMap: + if (arrow_type->id() == ::arrow::Type::MAP) { + return {}; + } + break; + default: + break; + } + + return InvalidSchema("Cannot read Iceberg type: {} from Parquet type: {}", + expected_type, arrow_type->ToString()); +} + +// Forward declaration +Result<FieldProjection> ProjectNested( + const Type& expected_type, + const std::vector<::parquet::arrow::SchemaField>& parquet_fields); + +Result<FieldProjection> ProjectStruct( + const StructType& struct_type, + const std::vector<::parquet::arrow::SchemaField>& parquet_fields) { + struct FieldContext { + size_t local_index; + const ::parquet::arrow::SchemaField& parquet_field; + }; + std::unordered_map<int32_t, FieldContext> field_context_map; + field_context_map.reserve(parquet_fields.size()); + + for (size_t i = 0; i < parquet_fields.size(); ++i) { + const ::parquet::arrow::SchemaField& parquet_field = parquet_fields[i]; + auto field_id = GetFieldId(parquet_field); + if (!field_id) { + continue; + } + if (const auto [iter, inserted] = field_context_map.emplace( + std::piecewise_construct, std::forward_as_tuple(field_id.value()), + std::forward_as_tuple(i, parquet_field)); + !inserted) [[unlikely]] { + return InvalidSchema("Duplicate field id found in Parquet schema: {}", + field_id.value()); + } + } + + FieldProjection result; + result.children.reserve(struct_type.fields().size()); + + for (const auto& expected_field : struct_type.fields()) { + int32_t field_id = expected_field.field_id(); + FieldProjection child_projection; + + if (auto iter = field_context_map.find(field_id); iter != field_context_map.cend()) { + const auto& parquet_field = iter->second.parquet_field; + ICEBERG_RETURN_UNEXPECTED( + ValidateParquetSchemaEvolution(*expected_field.type(), parquet_field)); + if (expected_field.type()->is_nested()) { + ICEBERG_ASSIGN_OR_RAISE(child_projection, ProjectNested(*expected_field.type(), + parquet_field.children)); + } else { + child_projection.attributes = + std::make_shared<ParquetExtraAttributes>(parquet_field.column_index); + } + child_projection.from = iter->second.local_index; + child_projection.kind = FieldProjection::Kind::kProjected; + } else if (MetadataColumns::IsMetadataColumn(field_id)) { + child_projection.kind = FieldProjection::Kind::kMetadata; + } else if (expected_field.optional()) { + child_projection.kind = FieldProjection::Kind::kNull; Review Comment: (So only struct can contain these `kMetadata` and `kNull` projections. List entry and map value cannot? ########## src/iceberg/parquet/parquet_schema_util.cc: ########## @@ -17,20 +17,392 @@ * under the License. */ +#include <arrow/type.h> +#include <arrow/type_fwd.h> +#include <arrow/util/key_value_metadata.h> +#include <parquet/arrow/schema.h> #include <parquet/schema.h> +#include "iceberg/metadata_columns.h" #include "iceberg/parquet/parquet_schema_util_internal.h" +#include "iceberg/result.h" +#include "iceberg/schema_util_internal.h" #include "iceberg/util/checked_cast.h" +#include "iceberg/util/formatter.h" +#include "iceberg/util/macros.h" namespace iceberg::parquet { +namespace { + +constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id"; + +std::optional<int32_t> FieldIdFromMetadata( + const std::shared_ptr<const ::arrow::KeyValueMetadata>& metadata) { + if (!metadata) { + return std::nullopt; + } + int key = metadata->FindKey(kParquetFieldIdKey.data()); + if (key < 0) { + return std::nullopt; + } + std::string field_id_str = metadata->value(key); + int32_t field_id = -1; + try { + field_id = std::stoi(field_id_str); + } catch (const std::invalid_argument& e) { + return std::nullopt; + } catch (const std::out_of_range& e) { + return std::nullopt; + } + return field_id < 0 ? std::nullopt : std::make_optional(field_id); +} + +std::optional<int32_t> GetFieldId(const ::parquet::arrow::SchemaField& parquet_field) { + return FieldIdFromMetadata(parquet_field.field->metadata()); +} + +Status ValidateParquetSchemaEvolution( + const Type& expected_type, const ::parquet::arrow::SchemaField& parquet_field) { + const auto& arrow_type = parquet_field.field->type(); + switch (expected_type.type_id()) { + case TypeId::kBoolean: + if (arrow_type->id() == ::arrow::Type::BOOL) { + return {}; + } + break; + case TypeId::kInt: + if (arrow_type->id() == ::arrow::Type::INT32) { + return {}; + } + break; + case TypeId::kLong: + if (arrow_type->id() == ::arrow::Type::INT64 || + arrow_type->id() == ::arrow::Type::INT32) { + return {}; + } + break; + case TypeId::kFloat: + if (arrow_type->id() == ::arrow::Type::FLOAT) { + return {}; + } + break; + case TypeId::kDouble: + if (arrow_type->id() == ::arrow::Type::DOUBLE || + arrow_type->id() == ::arrow::Type::FLOAT) { + return {}; + } + break; + case TypeId::kDate: + if (arrow_type->id() == ::arrow::Type::DATE32) { + return {}; + } + break; + case TypeId::kTime: + if (arrow_type->id() == ::arrow::Type::TIME64) { + return {}; + } + break; + case TypeId::kTimestamp: + if (arrow_type->id() == ::arrow::Type::TIMESTAMP) { + const auto& timestamp_type = + internal::checked_cast<const ::arrow::TimestampType&>(*arrow_type); + if (timestamp_type.unit() == ::arrow::TimeUnit::MICRO && + timestamp_type.timezone().empty()) { + return {}; + } + } + break; + case TypeId::kTimestampTz: + if (arrow_type->id() == ::arrow::Type::TIMESTAMP) { + const auto& timestamp_type = + internal::checked_cast<const ::arrow::TimestampType&>(*arrow_type); + if (timestamp_type.unit() == ::arrow::TimeUnit::MICRO && + !timestamp_type.timezone().empty()) { + return {}; + } + } + break; + case TypeId::kString: + if (arrow_type->id() == ::arrow::Type::STRING) { + return {}; + } + break; + case TypeId::kBinary: + if (arrow_type->id() == ::arrow::Type::BINARY) { + return {}; + } + break; + case TypeId::kDecimal: + if (arrow_type->id() == ::arrow::Type::DECIMAL128) { + const auto& decimal_type = + internal::checked_cast<const DecimalType&>(expected_type); + const auto& arrow_decimal = + internal::checked_cast<const ::arrow::Decimal128Type&>(*arrow_type); + if (decimal_type.scale() == arrow_decimal.scale() && + decimal_type.precision() >= arrow_decimal.precision()) { + return {}; + } + } + break; + case TypeId::kUuid: + if (arrow_type->id() == ::arrow::Type::FIXED_SIZE_BINARY) { + const auto& fixed_binary = + internal::checked_cast<const ::arrow::FixedSizeBinaryType&>(*arrow_type); + if (fixed_binary.byte_width() == 16) { + return {}; + } + } + break; + case TypeId::kFixed: + if (arrow_type->id() == ::arrow::Type::FIXED_SIZE_BINARY) { + const auto& fixed_binary = + internal::checked_cast<const ::arrow::FixedSizeBinaryType&>(*arrow_type); + if (fixed_binary.byte_width() == + internal::checked_cast<const FixedType&>(expected_type).length()) { + return {}; + } + } + break; + case TypeId::kStruct: + if (arrow_type->id() == ::arrow::Type::STRUCT) { + return {}; + } + break; + case TypeId::kList: + if (arrow_type->id() == ::arrow::Type::LIST) { + return {}; + } + break; + case TypeId::kMap: + if (arrow_type->id() == ::arrow::Type::MAP) { + return {}; + } + break; + default: + break; + } + + return InvalidSchema("Cannot read Iceberg type: {} from Parquet type: {}", + expected_type, arrow_type->ToString()); +} + +// Forward declaration +Result<FieldProjection> ProjectNested( + const Type& expected_type, + const std::vector<::parquet::arrow::SchemaField>& parquet_fields); + +Result<FieldProjection> ProjectStruct( + const StructType& struct_type, + const std::vector<::parquet::arrow::SchemaField>& parquet_fields) { + struct FieldContext { + size_t local_index; + const ::parquet::arrow::SchemaField& parquet_field; + }; + std::unordered_map<int32_t, FieldContext> field_context_map; + field_context_map.reserve(parquet_fields.size()); + + for (size_t i = 0; i < parquet_fields.size(); ++i) { + const ::parquet::arrow::SchemaField& parquet_field = parquet_fields[i]; + auto field_id = GetFieldId(parquet_field); + if (!field_id) { + continue; + } + if (const auto [iter, inserted] = field_context_map.emplace( + std::piecewise_construct, std::forward_as_tuple(field_id.value()), + std::forward_as_tuple(i, parquet_field)); + !inserted) [[unlikely]] { + return InvalidSchema("Duplicate field id found in Parquet schema: {}", + field_id.value()); + } + } + + FieldProjection result; + result.children.reserve(struct_type.fields().size()); + + for (const auto& expected_field : struct_type.fields()) { + int32_t field_id = expected_field.field_id(); + FieldProjection child_projection; + + if (auto iter = field_context_map.find(field_id); iter != field_context_map.cend()) { + const auto& parquet_field = iter->second.parquet_field; + ICEBERG_RETURN_UNEXPECTED( + ValidateParquetSchemaEvolution(*expected_field.type(), parquet_field)); + if (expected_field.type()->is_nested()) { + ICEBERG_ASSIGN_OR_RAISE(child_projection, ProjectNested(*expected_field.type(), + parquet_field.children)); + } else { + child_projection.attributes = + std::make_shared<ParquetExtraAttributes>(parquet_field.column_index); + } + child_projection.from = iter->second.local_index; + child_projection.kind = FieldProjection::Kind::kProjected; + } else if (MetadataColumns::IsMetadataColumn(field_id)) { + child_projection.kind = FieldProjection::Kind::kMetadata; + } else if (expected_field.optional()) { + child_projection.kind = FieldProjection::Kind::kNull; + } else { + return InvalidSchema("Missing required field with id: {}", field_id); + } + + result.children.emplace_back(std::move(child_projection)); + } + + PruneFieldProjection(result); + return result; +} + +Result<FieldProjection> ProjectList( + const ListType& list_type, + const std::vector<::parquet::arrow::SchemaField>& parquet_fields) { + if (parquet_fields.size() != 1) { + return InvalidSchema("List type must have exactly one field, got {}", + parquet_fields.size()); + } + + const auto& parquet_field = parquet_fields[0]; + auto element_field_id = GetFieldId(parquet_field); + if (!element_field_id) { + return InvalidSchema("List element field missing field id"); + } + + const auto& expected_element_field = list_type.fields().back(); + if (expected_element_field.field_id() != element_field_id.value()) { + return InvalidSchema("List element field id mismatch, expected {}, got {}", + expected_element_field.field_id(), element_field_id.value()); + } + + ICEBERG_RETURN_UNEXPECTED( + ValidateParquetSchemaEvolution(*expected_element_field.type(), parquet_field)); + + FieldProjection element_projection; + if (expected_element_field.type()->is_nested()) { + ICEBERG_ASSIGN_OR_RAISE( + element_projection, + ProjectNested(*expected_element_field.type(), parquet_field.children)); + } else { + element_projection.attributes = + std::make_shared<ParquetExtraAttributes>(parquet_field.column_index); + } + + element_projection.kind = FieldProjection::Kind::kProjected; + element_projection.from = size_t{0}; + + FieldProjection result; + result.children.emplace_back(std::move(element_projection)); + return result; +} + +Result<FieldProjection> ProjectMap( + const MapType& map_type, + const std::vector<::parquet::arrow::SchemaField>& parquet_fields) { + if (parquet_fields.size() != 2) { + return InvalidSchema("Map type must have exactly two fields, got {}", + parquet_fields.size()); + } + + auto key_field_id = GetFieldId(parquet_fields[0]); + if (!key_field_id) { + return InvalidSchema("Map key field missing field id"); + } + auto value_field_id = GetFieldId(parquet_fields[1]); + if (!value_field_id) { + return InvalidSchema("Map value field missing field id"); + } + + const auto& expected_key_field = map_type.key(); + const auto& expected_value_field = map_type.value(); + if (expected_key_field.field_id() != key_field_id.value()) { + return InvalidSchema("Map key field id mismatch, expected {}, got {}", + expected_key_field.field_id(), key_field_id.value()); + } + if (expected_value_field.field_id() != value_field_id.value()) { + return InvalidSchema("Map value field id mismatch, expected {}, got {}", + expected_value_field.field_id(), value_field_id.value()); + } + + FieldProjection result; + Review Comment: reserve children to 2? ########## src/iceberg/parquet/parquet_schema_util.cc: ########## @@ -17,20 +17,392 @@ * under the License. */ +#include <arrow/type.h> +#include <arrow/type_fwd.h> +#include <arrow/util/key_value_metadata.h> +#include <parquet/arrow/schema.h> #include <parquet/schema.h> +#include "iceberg/metadata_columns.h" #include "iceberg/parquet/parquet_schema_util_internal.h" +#include "iceberg/result.h" +#include "iceberg/schema_util_internal.h" #include "iceberg/util/checked_cast.h" +#include "iceberg/util/formatter.h" +#include "iceberg/util/macros.h" namespace iceberg::parquet { +namespace { + +constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id"; + +std::optional<int32_t> FieldIdFromMetadata( + const std::shared_ptr<const ::arrow::KeyValueMetadata>& metadata) { + if (!metadata) { + return std::nullopt; + } + int key = metadata->FindKey(kParquetFieldIdKey.data()); + if (key < 0) { + return std::nullopt; + } + std::string field_id_str = metadata->value(key); + int32_t field_id = -1; + try { + field_id = std::stoi(field_id_str); + } catch (const std::invalid_argument& e) { + return std::nullopt; + } catch (const std::out_of_range& e) { + return std::nullopt; + } + return field_id < 0 ? std::nullopt : std::make_optional(field_id); +} + +std::optional<int32_t> GetFieldId(const ::parquet::arrow::SchemaField& parquet_field) { + return FieldIdFromMetadata(parquet_field.field->metadata()); +} + +Status ValidateParquetSchemaEvolution( + const Type& expected_type, const ::parquet::arrow::SchemaField& parquet_field) { + const auto& arrow_type = parquet_field.field->type(); + switch (expected_type.type_id()) { + case TypeId::kBoolean: + if (arrow_type->id() == ::arrow::Type::BOOL) { + return {}; + } + break; + case TypeId::kInt: + if (arrow_type->id() == ::arrow::Type::INT32) { + return {}; + } + break; + case TypeId::kLong: + if (arrow_type->id() == ::arrow::Type::INT64 || + arrow_type->id() == ::arrow::Type::INT32) { + return {}; + } + break; + case TypeId::kFloat: + if (arrow_type->id() == ::arrow::Type::FLOAT) { + return {}; + } + break; + case TypeId::kDouble: + if (arrow_type->id() == ::arrow::Type::DOUBLE || + arrow_type->id() == ::arrow::Type::FLOAT) { + return {}; + } + break; + case TypeId::kDate: + if (arrow_type->id() == ::arrow::Type::DATE32) { + return {}; + } + break; + case TypeId::kTime: + if (arrow_type->id() == ::arrow::Type::TIME64) { + return {}; + } + break; + case TypeId::kTimestamp: + if (arrow_type->id() == ::arrow::Type::TIMESTAMP) { + const auto& timestamp_type = + internal::checked_cast<const ::arrow::TimestampType&>(*arrow_type); + if (timestamp_type.unit() == ::arrow::TimeUnit::MICRO && + timestamp_type.timezone().empty()) { + return {}; + } + } + break; + case TypeId::kTimestampTz: + if (arrow_type->id() == ::arrow::Type::TIMESTAMP) { + const auto& timestamp_type = + internal::checked_cast<const ::arrow::TimestampType&>(*arrow_type); + if (timestamp_type.unit() == ::arrow::TimeUnit::MICRO && + !timestamp_type.timezone().empty()) { + return {}; + } + } + break; + case TypeId::kString: + if (arrow_type->id() == ::arrow::Type::STRING) { + return {}; + } + break; + case TypeId::kBinary: + if (arrow_type->id() == ::arrow::Type::BINARY) { + return {}; + } + break; + case TypeId::kDecimal: + if (arrow_type->id() == ::arrow::Type::DECIMAL128) { + const auto& decimal_type = + internal::checked_cast<const DecimalType&>(expected_type); + const auto& arrow_decimal = + internal::checked_cast<const ::arrow::Decimal128Type&>(*arrow_type); + if (decimal_type.scale() == arrow_decimal.scale() && + decimal_type.precision() >= arrow_decimal.precision()) { + return {}; + } + } + break; + case TypeId::kUuid: + if (arrow_type->id() == ::arrow::Type::FIXED_SIZE_BINARY) { + const auto& fixed_binary = + internal::checked_cast<const ::arrow::FixedSizeBinaryType&>(*arrow_type); + if (fixed_binary.byte_width() == 16) { + return {}; + } + } + break; + case TypeId::kFixed: + if (arrow_type->id() == ::arrow::Type::FIXED_SIZE_BINARY) { + const auto& fixed_binary = + internal::checked_cast<const ::arrow::FixedSizeBinaryType&>(*arrow_type); + if (fixed_binary.byte_width() == + internal::checked_cast<const FixedType&>(expected_type).length()) { + return {}; + } + } + break; + case TypeId::kStruct: + if (arrow_type->id() == ::arrow::Type::STRUCT) { + return {}; + } + break; + case TypeId::kList: + if (arrow_type->id() == ::arrow::Type::LIST) { + return {}; + } + break; + case TypeId::kMap: + if (arrow_type->id() == ::arrow::Type::MAP) { + return {}; + } + break; + default: + break; + } + + return InvalidSchema("Cannot read Iceberg type: {} from Parquet type: {}", + expected_type, arrow_type->ToString()); +} + +// Forward declaration +Result<FieldProjection> ProjectNested( + const Type& expected_type, + const std::vector<::parquet::arrow::SchemaField>& parquet_fields); + +Result<FieldProjection> ProjectStruct( + const StructType& struct_type, + const std::vector<::parquet::arrow::SchemaField>& parquet_fields) { + struct FieldContext { + size_t local_index; + const ::parquet::arrow::SchemaField& parquet_field; + }; + std::unordered_map<int32_t, FieldContext> field_context_map; + field_context_map.reserve(parquet_fields.size()); + + for (size_t i = 0; i < parquet_fields.size(); ++i) { + const ::parquet::arrow::SchemaField& parquet_field = parquet_fields[i]; + auto field_id = GetFieldId(parquet_field); + if (!field_id) { + continue; + } + if (const auto [iter, inserted] = field_context_map.emplace( + std::piecewise_construct, std::forward_as_tuple(field_id.value()), + std::forward_as_tuple(i, parquet_field)); + !inserted) [[unlikely]] { + return InvalidSchema("Duplicate field id found in Parquet schema: {}", + field_id.value()); + } + } + + FieldProjection result; + result.children.reserve(struct_type.fields().size()); + + for (const auto& expected_field : struct_type.fields()) { + int32_t field_id = expected_field.field_id(); + FieldProjection child_projection; + + if (auto iter = field_context_map.find(field_id); iter != field_context_map.cend()) { + const auto& parquet_field = iter->second.parquet_field; + ICEBERG_RETURN_UNEXPECTED( + ValidateParquetSchemaEvolution(*expected_field.type(), parquet_field)); + if (expected_field.type()->is_nested()) { + ICEBERG_ASSIGN_OR_RAISE(child_projection, ProjectNested(*expected_field.type(), + parquet_field.children)); + } else { + child_projection.attributes = + std::make_shared<ParquetExtraAttributes>(parquet_field.column_index); + } + child_projection.from = iter->second.local_index; + child_projection.kind = FieldProjection::Kind::kProjected; + } else if (MetadataColumns::IsMetadataColumn(field_id)) { + child_projection.kind = FieldProjection::Kind::kMetadata; + } else if (expected_field.optional()) { + child_projection.kind = FieldProjection::Kind::kNull; + } else { + return InvalidSchema("Missing required field with id: {}", field_id); + } + + result.children.emplace_back(std::move(child_projection)); + } + + PruneFieldProjection(result); + return result; +} + +Result<FieldProjection> ProjectList( + const ListType& list_type, + const std::vector<::parquet::arrow::SchemaField>& parquet_fields) { + if (parquet_fields.size() != 1) { + return InvalidSchema("List type must have exactly one field, got {}", + parquet_fields.size()); + } + + const auto& parquet_field = parquet_fields[0]; + auto element_field_id = GetFieldId(parquet_field); + if (!element_field_id) { + return InvalidSchema("List element field missing field id"); + } + + const auto& expected_element_field = list_type.fields().back(); + if (expected_element_field.field_id() != element_field_id.value()) { + return InvalidSchema("List element field id mismatch, expected {}, got {}", + expected_element_field.field_id(), element_field_id.value()); + } + + ICEBERG_RETURN_UNEXPECTED( + ValidateParquetSchemaEvolution(*expected_element_field.type(), parquet_field)); + + FieldProjection element_projection; + if (expected_element_field.type()->is_nested()) { + ICEBERG_ASSIGN_OR_RAISE( + element_projection, + ProjectNested(*expected_element_field.type(), parquet_field.children)); + } else { + element_projection.attributes = + std::make_shared<ParquetExtraAttributes>(parquet_field.column_index); + } + + element_projection.kind = FieldProjection::Kind::kProjected; + element_projection.from = size_t{0}; + + FieldProjection result; + result.children.emplace_back(std::move(element_projection)); + return result; +} + +Result<FieldProjection> ProjectMap( + const MapType& map_type, + const std::vector<::parquet::arrow::SchemaField>& parquet_fields) { + if (parquet_fields.size() != 2) { + return InvalidSchema("Map type must have exactly two fields, got {}", + parquet_fields.size()); + } + + auto key_field_id = GetFieldId(parquet_fields[0]); + if (!key_field_id) { + return InvalidSchema("Map key field missing field id"); + } + auto value_field_id = GetFieldId(parquet_fields[1]); + if (!value_field_id) { + return InvalidSchema("Map value field missing field id"); + } + + const auto& expected_key_field = map_type.key(); + const auto& expected_value_field = map_type.value(); + if (expected_key_field.field_id() != key_field_id.value()) { + return InvalidSchema("Map key field id mismatch, expected {}, got {}", + expected_key_field.field_id(), key_field_id.value()); + } + if (expected_value_field.field_id() != value_field_id.value()) { + return InvalidSchema("Map value field id mismatch, expected {}, got {}", + expected_value_field.field_id(), value_field_id.value()); + } + + FieldProjection result; + + for (size_t i = 0; i < parquet_fields.size(); ++i) { + FieldProjection sub_projection; + const auto& sub_node = parquet_fields[i]; + const auto& expected_sub_field = map_type.fields()[i]; + ICEBERG_RETURN_UNEXPECTED( + ValidateParquetSchemaEvolution(*expected_sub_field.type(), sub_node)); + if (expected_sub_field.type()->is_nested()) { + ICEBERG_ASSIGN_OR_RAISE( + sub_projection, ProjectNested(*expected_sub_field.type(), sub_node.children)); + } else { + sub_projection.attributes = + std::make_shared<ParquetExtraAttributes>(sub_node.column_index); + } + sub_projection.kind = FieldProjection::Kind::kProjected; + sub_projection.from = i; + result.children.emplace_back(std::move(sub_projection)); + } + + return result; +} + +Result<FieldProjection> ProjectNested( + const Type& expected_type, + const std::vector<::parquet::arrow::SchemaField>& parquet_fields) { + if (!expected_type.is_nested()) { + return InvalidSchema("Expected a nested type, but got {}", expected_type); + } + + switch (expected_type.type_id()) { + case TypeId::kStruct: + return ProjectStruct(internal::checked_cast<const StructType&>(expected_type), + parquet_fields); + case TypeId::kList: + return ProjectList(internal::checked_cast<const ListType&>(expected_type), + parquet_fields); + case TypeId::kMap: + if (parquet_fields.size() != 1 || + parquet_fields[0].field->type()->id() != ::arrow::Type::STRUCT || + parquet_fields[0].children.size() != 2) { + return InvalidSchema( + "Map type must have exactly one struct field with two children"); + } + return ProjectMap(internal::checked_cast<const MapType&>(expected_type), + parquet_fields[0].children); + default: + return InvalidSchema("Unsupported nested type: {}", expected_type); + } +} + +void CollectColumnIds(const FieldProjection& field_projection, + std::vector<int32_t>* column_ids) { + if (field_projection.attributes) { + auto parquet_attributes = internal::checked_cast<const ParquetExtraAttributes&>( Review Comment: ```suggestion const auto& parquet_attributes = internal::checked_cast<const ParquetExtraAttributes&>( ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org