This is an automated email from the ASF dual-hosted git repository.

sungwy pushed a commit to branch pyiceberg-0.7.x
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git

commit ea17a3224c9ef7ee32f1f916ca55d9ba19ca6c5f
Author: Sung Yun <[email protected]>
AuthorDate: Wed Jul 31 01:26:57 2024 -0400

    Use original `partition-spec-id` when marking a file as deleted (#984)
    
    Co-authored-by: Sung Yun <[email protected]>
---
 pyiceberg/table/__init__.py       | 26 ++++++-----
 tests/integration/test_deletes.py | 91 ++++++++++++++++++++++++++++++++++++++-
 2 files changed, 106 insertions(+), 11 deletions(-)

diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 34e9d2c5..5188152a 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -3135,16 +3135,22 @@ class _SnapshotProducer(UpdateTableMetadata[U], 
Generic[U]):
             # Check if we need to mark the files as deleted
             deleted_entries = self._deleted_entries()
             if len(deleted_entries) > 0:
-                with write_manifest(
-                    
format_version=self._transaction.table_metadata.format_version,
-                    spec=self._transaction.table_metadata.spec(),
-                    schema=self._transaction.table_metadata.schema(),
-                    output_file=self.new_manifest_output(),
-                    snapshot_id=self._snapshot_id,
-                ) as writer:
-                    for delete_entry in deleted_entries:
-                        writer.add_entry(delete_entry)
-                return [writer.to_manifest_file()]
+                deleted_manifests = []
+                partition_groups: Dict[int, List[ManifestEntry]] = 
defaultdict(list)
+                for deleted_entry in deleted_entries:
+                    
partition_groups[deleted_entry.data_file.spec_id].append(deleted_entry)
+                for spec_id, entries in partition_groups.items():
+                    with write_manifest(
+                        
format_version=self._transaction.table_metadata.format_version,
+                        spec=self._transaction.table_metadata.specs()[spec_id],
+                        schema=self._transaction.table_metadata.schema(),
+                        output_file=self.new_manifest_output(),
+                        snapshot_id=self._snapshot_id,
+                    ) as writer:
+                        for entry in entries:
+                            writer.add_entry(entry)
+                    deleted_manifests.append(writer.to_manifest_file())
+                return deleted_manifests
             else:
                 return []
 
diff --git a/tests/integration/test_deletes.py 
b/tests/integration/test_deletes.py
index 49de265a..4bddf09b 100644
--- a/tests/integration/test_deletes.py
+++ b/tests/integration/test_deletes.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 # pylint:disable=redefined-outer-name
+from datetime import datetime
 from typing import List
 
 import pyarrow as pa
@@ -25,9 +26,11 @@ from pyiceberg.catalog.rest import RestCatalog
 from pyiceberg.exceptions import NoSuchTableError
 from pyiceberg.expressions import AlwaysTrue, EqualTo
 from pyiceberg.manifest import ManifestEntryStatus
+from pyiceberg.partitioning import PartitionField, PartitionSpec
 from pyiceberg.schema import Schema
 from pyiceberg.table.snapshots import Operation, Summary
-from pyiceberg.types import FloatType, IntegerType, NestedField
+from pyiceberg.transforms import IdentityTransform
+from pyiceberg.types import FloatType, IntegerType, LongType, NestedField, 
TimestampType
 
 
 def run_spark_commands(spark: SparkSession, sqls: List[str]) -> None:
@@ -556,3 +559,89 @@ def test_delete_overwrite_table_with_nan(session_catalog: 
RestCatalog) -> None:
     assert 2.0 in result
     assert 3.0 in result
     assert 4.0 in result
+
+
[email protected]
+def test_delete_after_partition_evolution_from_unpartitioned(session_catalog: 
RestCatalog) -> None:
+    identifier = 
"default.test_delete_after_partition_evolution_from_unpartitioned"
+
+    arrow_table = pa.Table.from_arrays(
+        [
+            pa.array([2, 3, 4, 5, 6]),
+        ],
+        names=["idx"],
+    )
+
+    try:
+        session_catalog.drop_table(identifier)
+    except NoSuchTableError:
+        pass
+
+    tbl = session_catalog.create_table(
+        identifier,
+        schema=Schema(
+            NestedField(1, "idx", LongType()),
+        ),
+    )
+
+    tbl.append(arrow_table)
+
+    with tbl.transaction() as tx:
+        with tx.update_schema() as schema:
+            schema.rename_column("idx", "id")
+        with tx.update_spec() as spec:
+            spec.add_field("id", IdentityTransform())
+
+    # Append one more time to create data files with two partition specs
+    tbl.append(arrow_table.rename_columns(["id"]))
+
+    tbl.delete("id == 4")
+
+    # Expect 8 records: 10 records - 2
+    assert len(tbl.scan().to_arrow()) == 8
+
+
[email protected]
+def test_delete_after_partition_evolution_from_partitioned(session_catalog: 
RestCatalog) -> None:
+    identifier = 
"default.test_delete_after_partition_evolution_from_partitioned"
+
+    arrow_table = pa.Table.from_arrays(
+        [
+            pa.array([2, 3, 4, 5, 6]),
+            pa.array([
+                datetime(2021, 5, 19),
+                datetime(2022, 7, 25),
+                datetime(2023, 3, 22),
+                datetime(2024, 7, 17),
+                datetime(2025, 2, 22),
+            ]),
+        ],
+        names=["idx", "ts"],
+    )
+
+    try:
+        session_catalog.drop_table(identifier)
+    except NoSuchTableError:
+        pass
+
+    tbl = session_catalog.create_table(
+        identifier,
+        schema=Schema(NestedField(1, "idx", LongType()), NestedField(2, "ts", 
TimestampType())),
+        partition_spec=PartitionSpec(PartitionField(source_id=2, 
field_id=1000, transform=IdentityTransform(), name="ts")),
+    )
+
+    tbl.append(arrow_table)
+
+    with tbl.transaction() as tx:
+        with tx.update_schema() as schema:
+            schema.rename_column("idx", "id")
+        with tx.update_spec() as spec:
+            spec.add_field("id", IdentityTransform())
+
+    # Append one more time to create data files with two partition specs
+    tbl.append(arrow_table.rename_columns(["id", "ts"]))
+
+    tbl.delete("id == 4")
+
+    # Expect 8 records: 10 records - 2
+    assert len(tbl.scan().to_arrow()) == 8

Reply via email to