KevinJiao opened a new issue, #2672:
URL: https://github.com/apache/iceberg-python/issues/2672

   ### Apache Iceberg version
   
   0.10.0 (latest release)
   
   ### Please describe the bug 🐞
   
   PyIceberg raises `ValueError: Could not find field with id: X` when reading 
partitioned tables with column projection after schema evolution, when:
   
   1. Table is partitioned
   2. Query uses column projection 
   3. Partition field is NOT included in the selected columns
   4. Schema has evolved 
   5. An old partition file is missing a field that was requested in the query
   
   My hunch at the root cause:
   
   In `pyarrow:_get_column_project_values`, we are trying to get the partition 
schema from the projected schema, but this will fail when the projected schema 
doesn't contain the partition.
   I think this can be fixed if we passed through the full table schema instead 
of the projected_schema to `partition_spec.partition_type(schema)`. I'd be 
happy to submit a patch if this is regarded as the correct approach.
   
   Reproduction script:
   ```
   import tempfile
   from datetime import date
   from pathlib import Path
   
   import pyarrow as pa
   from pyiceberg.catalog.sql import SqlCatalog
   from pyiceberg.partitioning import PartitionField, PartitionSpec
   from pyiceberg.schema import Schema
   from pyiceberg.transforms import IdentityTransform
   from pyiceberg.types import DateType, IntegerType, NestedField, StringType
   
   temp_dir = tempfile.mkdtemp()
   warehouse_path = Path(temp_dir) / "warehouse"
   warehouse_path.mkdir()
   catalog_db = Path(temp_dir) / "catalog.db"
   
   catalog = SqlCatalog(
       "test_catalog",
       **{
           "uri": f"sqlite:///{catalog_db}",
           "warehouse": f"file://{warehouse_path}",
       }
   )
   catalog.create_namespace("default")
   
   # Define initial schema
   initial_schema = Schema(
       NestedField(field_id=1, name="partition_date", field_type=DateType(), 
required=False),
       NestedField(field_id=2, name="id", field_type=IntegerType(), 
required=False),
       NestedField(field_id=3, name="name", field_type=StringType(), 
required=False),
       NestedField(field_id=4, name="value", field_type=IntegerType(), 
required=False),
   )
   
   partition_spec = PartitionSpec(
       PartitionField(
           source_id=1,
           field_id=1000,
           transform=IdentityTransform(),
           name="partition_date"
       )
   )
   
   table = catalog.create_table(
       "default.test_table",
       schema=initial_schema,
       partition_spec=partition_spec,
   )
   
   data_v1 = pa.Table.from_pylist(
       [
           {"partition_date": date(2024, 1, 1), "id": 1, "name": "Alice", 
"value": 100},
       ],
       schema=pa.schema([
           ("partition_date", pa.date32()),
           ("id", pa.int32()),
           ("name", pa.string()),
           ("value", pa.int32()),
       ])
   )
   
   table.append(data_v1)
   
   # Evolve schema: Add 'new_column' field
   with table.update_schema() as update:
       update.add_column("new_column", StringType())
   
   table = catalog.load_table("default.test_table")
   
   data_v2 = pa.Table.from_pylist(
       [
           {"partition_date": date(2024, 1, 2), "id": 2, "name": "Bob", 
"value": 200, "new_column": "new_value"},
       ],
       schema=pa.schema([
           ("partition_date", pa.date32()),
           ("id", pa.int32()),
           ("name", pa.string()),
           ("value", pa.int32()),
           ("new_column", pa.string()),
       ])
   )
   
   table.append(data_v2)
   
   # Test 1: Query with all fields (should work)
   scan = table.scan(selected_fields=("partition_date", "id", "name", "value", 
"new_column"))
   result = scan.to_arrow()
   
   # Test 2: Query WITHOUT partition field but requesting new_column
   # This triggers the bug when reading partition 2024-01-01
   scan2 = table.scan(selected_fields=("id", "name", "value", "new_column"))
   result2 = scan2.to_arrow()
   ```
   
   Traceback:
   ```
   Traceback (most recent call last):
     File 
"/mnt/home/kevin.jiao/repos/research/reproduce_pyiceberg_bug_minimal.py", line 
92, in <module>
       result2 = scan2.to_arrow()
                 ^^^^^^^^^^^^^^^^
     File 
"/mnt/home/kevin.jiao/repos/research/.venv/lib/python3.11/site-packages/pyiceberg/table/__init__.py",
 line 1989, in to_arrow
       ).to_table(self.plan_files())
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/mnt/home/kevin.jiao/repos/research/.venv/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py",
 line 1664, in to_table
       result = pa.concat_tables(
                ^^^^^^^^^^^^^^^^^
     File "pyarrow/table.pxi", line 6307, in pyarrow.lib.concat_tables
     File 
"/mnt/home/kevin.jiao/repos/research/.venv/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py",
 line 1665, in <genexpr>
       (pa.Table.from_batches([batch]) for batch in 
itertools.chain([first_batch], batches)), promote_options="permissive"
       
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/mnt/home/kevin.jiao/repos/research/.venv/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py",
 line 1708, in to_record_batches
       for batches in executor.map(batches_for_task, tasks):
     File 
"/mnt/home/kevin.jiao/.pyenv/versions/3.11.9/lib/python3.11/concurrent/futures/_base.py",
 line 619, in result_iterator
       yield _result_or_cancel(fs.pop())
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/mnt/home/kevin.jiao/.pyenv/versions/3.11.9/lib/python3.11/concurrent/futures/_base.py",
 line 317, in _result_or_cancel
       return fut.result(timeout)
              ^^^^^^^^^^^^^^^^^^^
     File 
"/mnt/home/kevin.jiao/.pyenv/versions/3.11.9/lib/python3.11/concurrent/futures/_base.py",
 line 449, in result
       return self.__get_result()
              ^^^^^^^^^^^^^^^^^^^
     File 
"/mnt/home/kevin.jiao/.pyenv/versions/3.11.9/lib/python3.11/concurrent/futures/_base.py",
 line 401, in __get_result
       raise self._exception
     File 
"/mnt/home/kevin.jiao/.pyenv/versions/3.11.9/lib/python3.11/concurrent/futures/thread.py",
 line 58, in run
       result = self.fn(*self.args, **self.kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/mnt/home/kevin.jiao/repos/research/.venv/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py",
 line 1705, in batches_for_task
       return list(self._record_batches_from_scan_tasks_and_deletes([task], 
deletes_per_file))
              
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/mnt/home/kevin.jiao/repos/research/.venv/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py",
 line 1744, in _record_batches_from_scan_tasks_and_deletes
       for batch in batches:
     File 
"/mnt/home/kevin.jiao/repos/research/.venv/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py",
 line 1513, in _task_to_record_batches
       projected_missing_fields = _get_column_projection_values(
                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/mnt/home/kevin.jiao/repos/research/.venv/lib/python3.11/site-packages/pyiceberg/io/pyarrow.py",
 line 1472, in _get_column_projection_values
       partition_schema = partition_spec.partition_type(projected_schema)
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/mnt/home/kevin.jiao/repos/research/.venv/lib/python3.11/site-packages/pyiceberg/partitioning.py",
 line 226, in partition_type
       source_type = schema.find_type(field.source_id)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/mnt/home/kevin.jiao/repos/research/.venv/lib/python3.11/site-packages/pyiceberg/schema.py",
 line 235, in find_type
       field = self.find_field(name_or_id=name_or_id, 
case_sensitive=case_sensitive)
               
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/mnt/home/kevin.jiao/repos/research/.venv/lib/python3.11/site-packages/pyiceberg/schema.py",
 line 212, in find_field
       raise ValueError(f"Could not find field with id: {name_or_id}")
   ```
   
   ### Willingness to contribute
   
   - [x] I can contribute a fix for this bug independently
   - [x] I would be willing to contribute a fix for this bug with guidance from 
the Iceberg community
   - [ ] I cannot contribute a fix for this bug at this time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to