rdblue commented on code in PR #5011: URL: https://github.com/apache/iceberg/pull/5011#discussion_r907812542
########## python/src/iceberg/table/metadata.py: ########## @@ -0,0 +1,360 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from copy import copy +from typing import ( + Any, + Dict, + List, + Literal, + Optional, + Union, +) +from uuid import UUID, uuid4 + +from pydantic import Field, root_validator + +from iceberg.exceptions import ValidationError +from iceberg.schema import Schema +from iceberg.table.refs import MAIN_BRANCH, SnapshotRef, SnapshotRefType +from iceberg.utils.iceberg_base_model import IcebergBaseModel + +_INITIAL_SEQUENCE_NUMBER = 0 +INITIAL_SPEC_ID = 0 +DEFAULT_SCHEMA_ID = 0 +DEFAULT_SORT_ORDER_UNSORTED = 0 + + +def check_schemas(values: Dict[str, Any]) -> Dict[str, Any]: + """Validator to check if the current-schema-id is actually present in schemas""" + current_schema_id = values["current_schema_id"] + + for schema in values["schemas"]: + if schema.schema_id == current_schema_id: + return values + + raise ValidationError(f"current-schema-id {current_schema_id} can't be found in the schemas") + + +def check_partition_specs(values: Dict[str, Any]) -> Dict[str, Any]: + """Validator to check if the default-spec-id is present in partition-specs""" + default_spec_id = values["default_spec_id"] + + for spec in values["partition_specs"]: + if spec["spec-id"] == default_spec_id: + return values + + raise ValidationError(f"default-spec-id {default_spec_id} can't be found") + + +def check_sort_orders(values: Dict[str, Any]) -> Dict[str, Any]: + """Validator to check if the default_sort_order_id is present in sort-orders""" + default_sort_order_id = values["default_sort_order_id"] + + # 0 == unsorted + if default_sort_order_id != 0: + for sort in values["sort_orders"]: + if sort["order-id"] == default_sort_order_id: + return values + + raise ValidationError(f"default-sort-order-id {default_sort_order_id} can't be found") + return values + + +class TableMetadataCommonFields(IcebergBaseModel): + """Metadata for an Iceberg table as specified in the Apache Iceberg + spec (https://iceberg.apache.org/spec/#iceberg-table-spec)""" + + @root_validator(skip_on_failure=True) + def construct_refs(cls, data: Dict[str, Any]): + # This is going to be much nicer as soon as refs is an actual pydantic object + if not data.get("refs"): + if current_snapshot_id := data.get("current_snapshot_id"): + if current_snapshot_id != -1: + data["refs"] = { + MAIN_BRANCH: SnapshotRef(snapshot_id=current_snapshot_id, snapshot_ref_type=SnapshotRefType.branch) + } + return data + + location: str = Field() + """The table’s base location. This is used by writers to determine where + to store data files, manifest files, and table metadata files.""" + + last_updated_ms: int = Field(alias="last-updated-ms") + """Timestamp in milliseconds from the unix epoch when the table + was last updated. Each table metadata file should update this + field just before writing.""" + + last_column_id: int = Field(alias="last-column-id") + """An integer; the highest assigned column ID for the table. + This is used to ensure fields are always assigned an unused ID + when evolving schemas.""" + + schemas: List[Schema] = Field(default_factory=list) + """A list of schemas, stored as objects with schema-id.""" + + current_schema_id: int = Field(alias="current-schema-id", default=0) + """ID of the table’s current schema.""" + + partition_specs: list = Field(alias="partition-specs", default_factory=list) + """A list of partition specs, stored as full partition spec objects.""" + + default_spec_id: int = Field(alias="default-spec-id") + """ID of the “current” spec that writers should use by default.""" + + last_partition_id: int = Field(alias="last-partition-id") + """An integer; the highest assigned partition field ID across all + partition specs for the table. This is used to ensure partition fields + are always assigned an unused ID when evolving specs.""" + + properties: Dict[str, str] = Field(default_factory=dict) + """ A string to string map of table properties. This is used to + control settings that affect reading and writing and is not intended + to be used for arbitrary metadata. For example, commit.retry.num-retries + is used to control the number of commit retries.""" + + current_snapshot_id: Optional[int] = Field(alias="current-snapshot-id", default=-1) Review Comment: Non-blocker: we use -1 in the format, but if I could do it over, I would probably avoid it and use `null` / `None` instead. You may want to consider converting in a pre-validation from -1 to clean this up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
