This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 1e6ec0ec Use original `partition-spec-id` when marking a file as
deleted (#984)
1e6ec0ec is described below
commit 1e6ec0ecb95e73308e254d81f381eb0872d767b4
Author: Sung Yun <[email protected]>
AuthorDate: Wed Jul 31 01:26:57 2024 -0400
Use original `partition-spec-id` when marking a file as deleted (#984)
Co-authored-by: Sung Yun <[email protected]>
---
pyiceberg/table/__init__.py | 26 ++++++-----
tests/integration/test_deletes.py | 91 ++++++++++++++++++++++++++++++++++++++-
2 files changed, 106 insertions(+), 11 deletions(-)
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 34e9d2c5..5188152a 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -3135,16 +3135,22 @@ class _SnapshotProducer(UpdateTableMetadata[U],
Generic[U]):
# Check if we need to mark the files as deleted
deleted_entries = self._deleted_entries()
if len(deleted_entries) > 0:
- with write_manifest(
-
format_version=self._transaction.table_metadata.format_version,
- spec=self._transaction.table_metadata.spec(),
- schema=self._transaction.table_metadata.schema(),
- output_file=self.new_manifest_output(),
- snapshot_id=self._snapshot_id,
- ) as writer:
- for delete_entry in deleted_entries:
- writer.add_entry(delete_entry)
- return [writer.to_manifest_file()]
+ deleted_manifests = []
+ partition_groups: Dict[int, List[ManifestEntry]] =
defaultdict(list)
+ for deleted_entry in deleted_entries:
+
partition_groups[deleted_entry.data_file.spec_id].append(deleted_entry)
+ for spec_id, entries in partition_groups.items():
+ with write_manifest(
+
format_version=self._transaction.table_metadata.format_version,
+ spec=self._transaction.table_metadata.specs()[spec_id],
+ schema=self._transaction.table_metadata.schema(),
+ output_file=self.new_manifest_output(),
+ snapshot_id=self._snapshot_id,
+ ) as writer:
+ for entry in entries:
+ writer.add_entry(entry)
+ deleted_manifests.append(writer.to_manifest_file())
+ return deleted_manifests
else:
return []
diff --git a/tests/integration/test_deletes.py
b/tests/integration/test_deletes.py
index 49de265a..4bddf09b 100644
--- a/tests/integration/test_deletes.py
+++ b/tests/integration/test_deletes.py
@@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
# pylint:disable=redefined-outer-name
+from datetime import datetime
from typing import List
import pyarrow as pa
@@ -25,9 +26,11 @@ from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.expressions import AlwaysTrue, EqualTo
from pyiceberg.manifest import ManifestEntryStatus
+from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table.snapshots import Operation, Summary
-from pyiceberg.types import FloatType, IntegerType, NestedField
+from pyiceberg.transforms import IdentityTransform
+from pyiceberg.types import FloatType, IntegerType, LongType, NestedField,
TimestampType
def run_spark_commands(spark: SparkSession, sqls: List[str]) -> None:
@@ -556,3 +559,89 @@ def test_delete_overwrite_table_with_nan(session_catalog:
RestCatalog) -> None:
assert 2.0 in result
assert 3.0 in result
assert 4.0 in result
+
+
[email protected]
+def test_delete_after_partition_evolution_from_unpartitioned(session_catalog:
RestCatalog) -> None:
+ identifier =
"default.test_delete_after_partition_evolution_from_unpartitioned"
+
+ arrow_table = pa.Table.from_arrays(
+ [
+ pa.array([2, 3, 4, 5, 6]),
+ ],
+ names=["idx"],
+ )
+
+ try:
+ session_catalog.drop_table(identifier)
+ except NoSuchTableError:
+ pass
+
+ tbl = session_catalog.create_table(
+ identifier,
+ schema=Schema(
+ NestedField(1, "idx", LongType()),
+ ),
+ )
+
+ tbl.append(arrow_table)
+
+ with tbl.transaction() as tx:
+ with tx.update_schema() as schema:
+ schema.rename_column("idx", "id")
+ with tx.update_spec() as spec:
+ spec.add_field("id", IdentityTransform())
+
+ # Append one more time to create data files with two partition specs
+ tbl.append(arrow_table.rename_columns(["id"]))
+
+ tbl.delete("id == 4")
+
+ # Expect 8 records: 10 records - 2
+ assert len(tbl.scan().to_arrow()) == 8
+
+
[email protected]
+def test_delete_after_partition_evolution_from_partitioned(session_catalog:
RestCatalog) -> None:
+ identifier =
"default.test_delete_after_partition_evolution_from_partitioned"
+
+ arrow_table = pa.Table.from_arrays(
+ [
+ pa.array([2, 3, 4, 5, 6]),
+ pa.array([
+ datetime(2021, 5, 19),
+ datetime(2022, 7, 25),
+ datetime(2023, 3, 22),
+ datetime(2024, 7, 17),
+ datetime(2025, 2, 22),
+ ]),
+ ],
+ names=["idx", "ts"],
+ )
+
+ try:
+ session_catalog.drop_table(identifier)
+ except NoSuchTableError:
+ pass
+
+ tbl = session_catalog.create_table(
+ identifier,
+ schema=Schema(NestedField(1, "idx", LongType()), NestedField(2, "ts",
TimestampType())),
+ partition_spec=PartitionSpec(PartitionField(source_id=2,
field_id=1000, transform=IdentityTransform(), name="ts")),
+ )
+
+ tbl.append(arrow_table)
+
+ with tbl.transaction() as tx:
+ with tx.update_schema() as schema:
+ schema.rename_column("idx", "id")
+ with tx.update_spec() as spec:
+ spec.add_field("id", IdentityTransform())
+
+ # Append one more time to create data files with two partition specs
+ tbl.append(arrow_table.rename_columns(["id", "ts"]))
+
+ tbl.delete("id == 4")
+
+ # Expect 8 records: 10 records - 2
+ assert len(tbl.scan().to_arrow()) == 8