Fokko commented on code in PR #3149:
URL: https://github.com/apache/iceberg-python/pull/3149#discussion_r3000066172


##########
pyiceberg/table/__init__.py:
##########
@@ -542,12 +542,69 @@ def dynamic_partition_overwrite(
             )
         )
 
-        partitions_to_overwrite = {data_file.partition for data_file in 
data_files}
-        delete_filter = self._build_partition_predicate(
-            partition_records=partitions_to_overwrite, 
spec=self.table_metadata.spec(), schema=self.table_metadata.schema()
-        )
-        self.delete(delete_filter=delete_filter, 
snapshot_properties=snapshot_properties, branch=branch)
+        # partitions_to_overwrite = {data_file.partition for data_file in 
data_files}
+        # delete_filter = self._build_partition_predicate(
+        #     partition_records=partitions_to_overwrite, 
spec=self.table_metadata.spec(), schema=self.table_metadata.schema()
+        # )
+        # self.delete(delete_filter=delete_filter, 
snapshot_properties=snapshot_properties, branch=branch)
+
+        # Build the partition predicate per-spec to handle tables that have
+        # undergone partition spec evolution. Manifests in the snapshot may be
+        # written under different (older) specs. We need to project the 
overwrite
+        # partitions into each historical spec's coordinate space so that the
+        # manifest evaluator correctly identifies which old manifests to 
delete.
+        current_spec = self.table_metadata.spec()
+        current_schema = self.table_metadata.schema()
 
+        # Collect the source column names (e.g. "category") that are being
+        # overwritten — these are stable across spec evolution (only field IDs 
matter).
+        overwrite_source_ids = {field.source_id for field in 
current_spec.fields}
+
+        delete_filter: BooleanExpression = AlwaysFalse()
+
+        # For each historical spec in the snapshot, build a predicate using
+        # only the fields that spec knows about, matched against the
+        # corresponding positions in the new data files' partition records.
+        snapshot = self.table_metadata.snapshot_by_name(branch or MAIN_BRANCH)
+        if snapshot is not None:
+            spec_ids_in_snapshot = {m.partition_spec_id for m in 
snapshot.manifests(io=self._table.io)}

Review Comment:
   Also, iterating over the manifests is pretty expensive. Do we need to do 
that upfront?



##########
pyiceberg/table/__init__.py:
##########
@@ -542,12 +542,69 @@ def dynamic_partition_overwrite(
             )
         )
 
-        partitions_to_overwrite = {data_file.partition for data_file in 
data_files}
-        delete_filter = self._build_partition_predicate(
-            partition_records=partitions_to_overwrite, 
spec=self.table_metadata.spec(), schema=self.table_metadata.schema()
-        )
-        self.delete(delete_filter=delete_filter, 
snapshot_properties=snapshot_properties, branch=branch)
+        # partitions_to_overwrite = {data_file.partition for data_file in 
data_files}
+        # delete_filter = self._build_partition_predicate(
+        #     partition_records=partitions_to_overwrite, 
spec=self.table_metadata.spec(), schema=self.table_metadata.schema()
+        # )
+        # self.delete(delete_filter=delete_filter, 
snapshot_properties=snapshot_properties, branch=branch)
+
+        # Build the partition predicate per-spec to handle tables that have
+        # undergone partition spec evolution. Manifests in the snapshot may be
+        # written under different (older) specs. We need to project the 
overwrite
+        # partitions into each historical spec's coordinate space so that the
+        # manifest evaluator correctly identifies which old manifests to 
delete.
+        current_spec = self.table_metadata.spec()
+        current_schema = self.table_metadata.schema()
 
+        # Collect the source column names (e.g. "category") that are being
+        # overwritten — these are stable across spec evolution (only field IDs 
matter).
+        overwrite_source_ids = {field.source_id for field in 
current_spec.fields}
+
+        delete_filter: BooleanExpression = AlwaysFalse()
+
+        # For each historical spec in the snapshot, build a predicate using
+        # only the fields that spec knows about, matched against the
+        # corresponding positions in the new data files' partition records.
+        snapshot = self.table_metadata.snapshot_by_name(branch or MAIN_BRANCH)
+        if snapshot is not None:
+            spec_ids_in_snapshot = {m.partition_spec_id for m in 
snapshot.manifests(io=self._table.io)}

Review Comment:
   I think this logic is a bit wonky, if the branch doesn't exists then we 
silently fall back to the main branch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to