This is an automated email from the ASF dual-hosted git repository.

sungwy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 67e87730 Use the correct spec when rewiting existing manifests (#1157)
67e87730 is described below

commit 67e877303f417ac0a2b20db9752231b59fe1775c
Author: Fokko Driesprong <[email protected]>
AuthorDate: Wed Sep 11 22:42:09 2024 +0200

    Use the correct spec when rewiting existing manifests (#1157)
    
    * Use the correct spec when rewiting existing manifests
    
    Fixes #1108
    
    * Rename test
---
 pyiceberg/table/update/snapshot.py           |  2 +-
 tests/integration/test_writes/test_writes.py | 75 +++++++++++++++++++++-------
 2 files changed, 58 insertions(+), 19 deletions(-)

diff --git a/pyiceberg/table/update/snapshot.py 
b/pyiceberg/table/update/snapshot.py
index 8b8614db..2b9354d2 100644
--- a/pyiceberg/table/update/snapshot.py
+++ b/pyiceberg/table/update/snapshot.py
@@ -545,7 +545,7 @@ class OverwriteFiles(_SnapshotProducer["OverwriteFiles"]):
                     if any(entry.data_file not in found_deleted_data_files for 
entry in entries):
                         with write_manifest(
                             
format_version=self._transaction.table_metadata.format_version,
-                            spec=self._transaction.table_metadata.spec(),
+                            
spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
                             schema=self._transaction.table_metadata.schema(),
                             output_file=self.new_manifest_output(),
                             snapshot_id=self._snapshot_id,
diff --git a/tests/integration/test_writes/test_writes.py 
b/tests/integration/test_writes/test_writes.py
index ce5b1474..fc2746c6 100644
--- a/tests/integration/test_writes/test_writes.py
+++ b/tests/integration/test_writes/test_writes.py
@@ -18,7 +18,7 @@
 import math
 import os
 import time
-from datetime import date, datetime
+from datetime import date, datetime, timedelta
 from pathlib import Path
 from typing import Any, Dict
 from urllib.parse import urlparse
@@ -26,6 +26,7 @@ from urllib.parse import urlparse
 import numpy as np
 import pandas as pd
 import pyarrow as pa
+import pyarrow.compute as pc
 import pyarrow.parquet as pq
 import pytest
 import pytz
@@ -39,12 +40,12 @@ from pyiceberg.catalog.hive import HiveCatalog
 from pyiceberg.catalog.rest import RestCatalog
 from pyiceberg.catalog.sql import SqlCatalog
 from pyiceberg.exceptions import NoSuchTableError
-from pyiceberg.expressions import GreaterThanOrEqual, In, Not
+from pyiceberg.expressions import And, EqualTo, GreaterThanOrEqual, In, 
LessThan, Not
 from pyiceberg.io.pyarrow import _dataframe_to_data_files
 from pyiceberg.partitioning import PartitionField, PartitionSpec
 from pyiceberg.schema import Schema
 from pyiceberg.table import TableProperties
-from pyiceberg.transforms import DayTransform, IdentityTransform
+from pyiceberg.transforms import DayTransform, HourTransform, IdentityTransform
 from pyiceberg.types import (
     DateType,
     DoubleType,
@@ -1344,18 +1345,7 @@ def test_overwrite_all_data_with_filter(session_catalog: 
Catalog) -> None:
 
 
 @pytest.mark.integration
-def test_delete_threshold() -> None:
-    catalog = load_catalog(
-        "local",
-        **{
-            "type": "rest",
-            "uri": "http://localhost:8181";,
-            "s3.endpoint": "http://localhost:9000";,
-            "s3.access-key-id": "admin",
-            "s3.secret-access-key": "password",
-        },
-    )
-
+def test_delete_threshold(session_catalog: Catalog) -> None:
     schema = Schema(
         NestedField(field_id=101, name="id", field_type=LongType(), 
required=True),
         NestedField(field_id=103, name="created_at", field_type=DateType(), 
required=False),
@@ -1365,13 +1355,13 @@ def test_delete_threshold() -> None:
     partition_spec = PartitionSpec(PartitionField(source_id=103, 
field_id=2000, transform=DayTransform(), name="created_at_day"))
 
     try:
-        catalog.drop_table(
+        session_catalog.drop_table(
             identifier="default.scores",
         )
     except NoSuchTableError:
         pass
 
-    catalog.create_table(
+    session_catalog.create_table(
         identifier="default.scores",
         schema=schema,
         partition_spec=partition_spec,
@@ -1395,7 +1385,7 @@ def test_delete_threshold() -> None:
     # Create the dataframe
     df = pd.DataFrame({"id": id_column, "created_at": created_at_column, 
"relevancy_score": relevancy_score_column})
 
-    iceberg_table = catalog.load_table("default.scores")
+    iceberg_table = session_catalog.load_table("default.scores")
 
     # Convert the pandas DataFrame to a PyArrow Table with the Iceberg schema
     arrow_schema = iceberg_table.schema().as_arrow()
@@ -1409,3 +1399,52 @@ def test_delete_threshold() -> None:
     assert 
len(iceberg_table.scan(row_filter=Not(delete_condition)).to_arrow()) == 
lower_before
     iceberg_table.delete(delete_condition)
     assert len(iceberg_table.scan().to_arrow()) == lower_before
+
+
[email protected]
+def test_rewrite_manifest_after_partition_evolution(session_catalog: Catalog) 
-> None:
+    np.random.seed(876)
+    N = 1440
+    d = {
+        "timestamp": pa.array([datetime(2023, 1, 1, 0, 0, 0) + 
timedelta(minutes=i) for i in range(N)]),
+        "category": pa.array([np.random.choice(["A", "B", "C"]) for _ in 
range(N)]),
+        "value": pa.array(np.random.normal(size=N)),
+    }
+    data = pa.Table.from_pydict(d)
+
+    try:
+        session_catalog.drop_table(
+            identifier="default.test_error_table",
+        )
+    except NoSuchTableError:
+        pass
+
+    table = session_catalog.create_table(
+        "default.test_error_table",
+        schema=data.schema,
+    )
+
+    with table.update_spec() as update:
+        update.add_field("timestamp", transform=HourTransform())
+
+    table.append(data)
+
+    with table.update_spec() as update:
+        update.add_field("category", transform=IdentityTransform())
+
+    data_ = data.filter(
+        (pc.field("category") == "A")
+        & (pc.field("timestamp") >= datetime(2023, 1, 1, 0))
+        & (pc.field("timestamp") < datetime(2023, 1, 1, 1))
+    )
+
+    table.overwrite(
+        df=data_,
+        overwrite_filter=And(
+            And(
+                GreaterThanOrEqual("timestamp", datetime(2023, 1, 1, 
0).isoformat()),
+                LessThan("timestamp", datetime(2023, 1, 1, 1).isoformat()),
+            ),
+            EqualTo("category", "A"),
+        ),
+    )

Reply via email to