kevinjqliu commented on code in PR #2305:
URL: https://github.com/apache/iceberg-python/pull/2305#discussion_r2280604003


##########
tests/integration/test_partition_evolution.py:
##########
@@ -63,12 +64,19 @@ def _table_v2(catalog: Catalog) -> Table:
     return _create_table_with_schema(catalog, schema_with_timestamp, "2")
 
 
-def _create_table_with_schema(catalog: Catalog, schema: Schema, 
format_version: str) -> Table:
+def _create_table_with_schema(
+    catalog: Catalog, schema: Schema, format_version: str, partition_spec: 
Optional[PartitionSpec] = None
+) -> Table:
     tbl_name = "default.test_schema_evolution"
     try:
         catalog.drop_table(tbl_name)
     except NoSuchTableError:
         pass
+
+    if partition_spec:
+        return catalog.create_table(
+            identifier=tbl_name, schema=schema, partition_spec=partition_spec, 
properties={"format-version": format_version}
+        )
     return catalog.create_table(identifier=tbl_name, schema=schema, 
properties={"format-version": format_version})

Review Comment:
   and then we can just do this
   ```suggestion
       return catalog.create_table(
               identifier=tbl_name, schema=schema, 
partition_spec=partition_spec, properties={"format-version": format_version}
           )
   ```



##########
pyiceberg/table/update/spec.py:
##########
@@ -174,16 +174,12 @@ def _commit(self) -> UpdatesAndRequirements:
         return updates, requirements
 
     def _apply(self) -> PartitionSpec:
-        def _check_and_add_partition_name(schema: Schema, name: str, 
source_id: int, partition_names: Set[str]) -> None:
-            try:
-                field = schema.find_field(name)
-            except ValueError:
-                field = None
-
-            if source_id is not None and field is not None and field.field_id 
!= source_id:
-                raise ValueError(f"Cannot create identity partition from a 
different field in the schema {name}")
-            elif field is not None and source_id != field.field_id:
-                raise ValueError(f"Cannot create partition from name that 
exists in schema {name}")
+        def _check_and_add_partition_name(
+            schema: Schema, name: str, source_id: int, transform: 
Transform[Any, Any], partition_names: Set[str]
+        ) -> None:
+            from pyiceberg.partitioning import validate_partition_name
+
+            validate_partition_name(name, transform, source_id, schema)
             if not name:

Review Comment:
   wdyt about moving L183-L186 into the `validate_partition_name` to mirror the 
java impl
   
   
https://github.com/apache/iceberg/blob/4dbc7f578eee7ceb9def35ebfa1a4cc236fb598f/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L412-L414



##########
tests/integration/test_partition_evolution.py:
##########
@@ -63,12 +64,19 @@ def _table_v2(catalog: Catalog) -> Table:
     return _create_table_with_schema(catalog, schema_with_timestamp, "2")
 
 
-def _create_table_with_schema(catalog: Catalog, schema: Schema, 
format_version: str) -> Table:
+def _create_table_with_schema(
+    catalog: Catalog, schema: Schema, format_version: str, partition_spec: 
Optional[PartitionSpec] = None
+) -> Table:

Review Comment:
   following other create table helpers in tests, for example
   
https://github.com/apache/iceberg-python/blob/80135451d030569259d83674ef147e0d6f62fd51/tests/integration/test_register_table.py#L40-L59
   
   ```suggestion
   def _create_table_with_schema(
       catalog: Catalog, schema: Schema, format_version: str, partition_spec: 
PartitionSpec = UNPARTITIONED_PARTITION_SPEC
   ) -> Table:
   ```



##########
pyiceberg/partitioning.py:
##########
@@ -249,6 +249,26 @@ def partition_to_path(self, data: Record, schema: Schema) 
-> str:
 UNPARTITIONED_PARTITION_SPEC = PartitionSpec(spec_id=0)
 
 
+def validate_partition_name(
+    field_name: str,
+    partition_transform: Transform[Any, Any],
+    source_id: int,
+    schema: Schema,
+) -> None:
+    """Validate that a partition field name doesn't conflict with schema field 
names."""
+    try:
+        schema_field = schema.find_field(field_name)
+    except ValueError:
+        return  # No conflict if field doesn't exist in schema
+
+    if isinstance(partition_transform, (IdentityTransform, VoidTransform)):
+        # For identity transforms, allow conflict only if sourced from the 
same schema field
+        if schema_field.field_id != source_id:
+            raise ValueError(f"Cannot create identity partition from a 
different source field in the schema: {field_name}")
+    else:

Review Comment:
   match java error message
   ```suggestion
               raise ValueError(f"Cannot create identity partition sourced from 
different field in schema: {field_name}")
       else:
   ```



##########
pyiceberg/table/update/spec.py:
##########
@@ -244,6 +240,13 @@ def _add_new_field(
                 partition_fields.append(new_field)
 
         for added_field in self._adds:
+            _check_and_add_partition_name(
+                self._transaction.table_metadata.schema(),
+                added_field.name,
+                added_field.source_id,
+                added_field.transform,
+                partition_names,
+            )

Review Comment:
   good catch. just to confirm this covers the newly added partition fields?



##########
pyiceberg/partitioning.py:
##########
@@ -249,6 +249,26 @@ def partition_to_path(self, data: Record, schema: Schema) 
-> str:
 UNPARTITIONED_PARTITION_SPEC = PartitionSpec(spec_id=0)
 
 
+def validate_partition_name(
+    field_name: str,
+    partition_transform: Transform[Any, Any],
+    source_id: int,
+    schema: Schema,
+) -> None:
+    """Validate that a partition field name doesn't conflict with schema field 
names."""
+    try:
+        schema_field = schema.find_field(field_name)
+    except ValueError:
+        return  # No conflict if field doesn't exist in schema
+
+    if isinstance(partition_transform, (IdentityTransform, VoidTransform)):
+        # For identity transforms, allow conflict only if sourced from the 
same schema field

Review Comment:
   ```suggestion
           # For identity and void transforms, allow conflict only if sourced 
from the same schema field
   ```



##########
pyiceberg/table/update/schema.py:
##########
@@ -658,6 +658,14 @@ def _apply(self) -> Schema:
 
         # Check the field-ids
         new_schema = Schema(*struct.fields)
+        if self._transaction is not None:
+            from pyiceberg.partitioning import validate_partition_name
+
+            for spec in self._transaction.table_metadata.partition_specs:
+                for partition_field in spec.fields:
+                    validate_partition_name(
+                        partition_field.name, partition_field.transform, 
partition_field.source_id, new_schema
+                    )

Review Comment:
   i think there should always be a `self._transaction`
   ```suggestion
           from pyiceberg.partitioning import validate_partition_name
   
           for spec in self._transaction.table_metadata.partition_specs:
               for partition_field in spec.fields:
                   validate_partition_name(
                       partition_field.name, partition_field.transform, 
partition_field.source_id, new_schema
                   )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to