This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new dba1ef8  Correct schema behavior (#247)
dba1ef8 is described below

commit dba1ef8d85b438d4977586998c99a9dab369491c
Author: Fokko Driesprong <[email protected]>
AuthorDate: Fri Jan 5 08:43:55 2024 +0100

    Correct schema behavior (#247)
    
    * Correct schema behavior
    
    When we alter the schema, we want to use the latest
    schema by default, except when you select a specific
    snapshot that has a schema-id.
    
    * Add warning if schema-id is missing from the metadata
    
    * Catch unexisting snapshots
---
 pyiceberg/table/__init__.py | 21 ++++++++---
 tests/table/test_init.py    | 89 ++++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 103 insertions(+), 7 deletions(-)

diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index fa04fb7..1a4c39c 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 import datetime
 import itertools
 import uuid
+import warnings
 from abc import ABC, abstractmethod
 from copy import copy
 from dataclasses import dataclass
@@ -942,15 +943,23 @@ class TableScan(ABC):
         return self.table.current_snapshot()
 
     def projection(self) -> Schema:
-        snapshot_schema = self.table.schema()
-        if snapshot := self.snapshot():
-            if snapshot.schema_id is not None:
-                snapshot_schema = self.table.schemas()[snapshot.schema_id]
+        current_schema = self.table.schema()
+        if self.snapshot_id is not None:
+            snapshot = self.table.snapshot_by_id(self.snapshot_id)
+            if snapshot is not None:
+                if snapshot.schema_id is not None:
+                    snapshot_schema = 
self.table.schemas().get(snapshot.schema_id)
+                    if snapshot_schema is not None:
+                        current_schema = snapshot_schema
+                    else:
+                        warnings.warn(f"Metadata does not contain schema with 
id: {snapshot.schema_id}")
+            else:
+                raise ValueError(f"Snapshot not found: {self.snapshot_id}")
 
         if "*" in self.selected_fields:
-            return snapshot_schema
+            return current_schema
 
-        return snapshot_schema.select(*self.selected_fields, 
case_sensitive=self.case_sensitive)
+        return current_schema.select(*self.selected_fields, 
case_sensitive=self.case_sensitive)
 
     @abstractmethod
     def plan_files(self) -> Iterable[ScanTask]:
diff --git a/tests/table/test_init.py b/tests/table/test_init.py
index 04d467c..547990c 100644
--- a/tests/table/test_init.py
+++ b/tests/table/test_init.py
@@ -22,6 +22,7 @@ from typing import Dict
 import pytest
 from sortedcontainers import SortedList
 
+from pyiceberg.catalog.noop import NoopCatalog
 from pyiceberg.exceptions import CommitFailedException
 from pyiceberg.expressions import (
     AlwaysTrue,
@@ -29,7 +30,7 @@ from pyiceberg.expressions import (
     EqualTo,
     In,
 )
-from pyiceberg.io import PY_IO_IMPL
+from pyiceberg.io import PY_IO_IMPL, load_file_io
 from pyiceberg.manifest import (
     DataFile,
     DataFileContent,
@@ -848,3 +849,89 @@ def test_assert_default_sort_order_id(table_v2: Table) -> 
None:
         match="Requirement failed: default sort order id has changed: expected 
1, found 3",
     ):
         
AssertDefaultSortOrderId(default_sort_order_id=1).validate(base_metadata)
+
+
+def test_correct_schema() -> None:
+    table_metadata = TableMetadataV2(
+        **{
+            "format-version": 2,
+            "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
+            "location": "s3://bucket/test/location",
+            "last-sequence-number": 34,
+            "last-updated-ms": 1602638573590,
+            "last-column-id": 3,
+            "current-schema-id": 1,
+            "schemas": [
+                {"type": "struct", "schema-id": 0, "fields": [{"id": 1, 
"name": "x", "required": True, "type": "long"}]},
+                {
+                    "type": "struct",
+                    "schema-id": 1,
+                    "identifier-field-ids": [1, 2],
+                    "fields": [
+                        {"id": 1, "name": "x", "required": True, "type": 
"long"},
+                        {"id": 2, "name": "y", "required": True, "type": 
"long"},
+                        {"id": 3, "name": "z", "required": True, "type": 
"long"},
+                    ],
+                },
+            ],
+            "default-spec-id": 0,
+            "partition-specs": [
+                {"spec-id": 0, "fields": [{"name": "x", "transform": 
"identity", "source-id": 1, "field-id": 1000}]}
+            ],
+            "last-partition-id": 1000,
+            "default-sort-order-id": 0,
+            "sort-orders": [],
+            "current-snapshot-id": 123,
+            "snapshots": [
+                {
+                    "snapshot-id": 234,
+                    "timestamp-ms": 1515100955770,
+                    "sequence-number": 0,
+                    "summary": {"operation": "append"},
+                    "manifest-list": "s3://a/b/1.avro",
+                    "schema-id": 10,
+                },
+                {
+                    "snapshot-id": 123,
+                    "timestamp-ms": 1515100955770,
+                    "sequence-number": 0,
+                    "summary": {"operation": "append"},
+                    "manifest-list": "s3://a/b/1.avro",
+                    "schema-id": 0,
+                },
+            ],
+        }
+    )
+
+    t = Table(
+        identifier=("default", "t1"),
+        metadata=table_metadata,
+        metadata_location="s3://../..",
+        io=load_file_io(),
+        catalog=NoopCatalog("NoopCatalog"),
+    )
+
+    # Should use the current schema, instead the one from the snapshot
+    assert t.scan().projection() == Schema(
+        NestedField(field_id=1, name='x', field_type=LongType(), 
required=True),
+        NestedField(field_id=2, name='y', field_type=LongType(), 
required=True),
+        NestedField(field_id=3, name='z', field_type=LongType(), 
required=True),
+        schema_id=1,
+        identifier_field_ids=[1, 2],
+    )
+
+    # When we explicitly filter on the commit, we want to have the schema 
that's linked to the snapshot
+    assert t.scan(snapshot_id=123).projection() == Schema(
+        NestedField(field_id=1, name='x', field_type=LongType(), 
required=True),
+        schema_id=0,
+        identifier_field_ids=[],
+    )
+
+    with pytest.warns(UserWarning, match="Metadata does not contain schema 
with id: 10"):
+        t.scan(snapshot_id=234).projection()
+
+    # Invalid snapshot
+    with pytest.raises(ValueError) as exc_info:
+        _ = t.scan(snapshot_id=-1).projection()
+
+    assert "Snapshot not found: -1" in str(exc_info.value)

Reply via email to