This is an automated email from the ASF dual-hosted git repository.
sungwy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 67e87730 Use the correct spec when rewiting existing manifests (#1157)
67e87730 is described below
commit 67e877303f417ac0a2b20db9752231b59fe1775c
Author: Fokko Driesprong <[email protected]>
AuthorDate: Wed Sep 11 22:42:09 2024 +0200
Use the correct spec when rewiting existing manifests (#1157)
* Use the correct spec when rewiting existing manifests
Fixes #1108
* Rename test
---
pyiceberg/table/update/snapshot.py | 2 +-
tests/integration/test_writes/test_writes.py | 75 +++++++++++++++++++++-------
2 files changed, 58 insertions(+), 19 deletions(-)
diff --git a/pyiceberg/table/update/snapshot.py
b/pyiceberg/table/update/snapshot.py
index 8b8614db..2b9354d2 100644
--- a/pyiceberg/table/update/snapshot.py
+++ b/pyiceberg/table/update/snapshot.py
@@ -545,7 +545,7 @@ class OverwriteFiles(_SnapshotProducer["OverwriteFiles"]):
if any(entry.data_file not in found_deleted_data_files for
entry in entries):
with write_manifest(
format_version=self._transaction.table_metadata.format_version,
- spec=self._transaction.table_metadata.spec(),
+
spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
schema=self._transaction.table_metadata.schema(),
output_file=self.new_manifest_output(),
snapshot_id=self._snapshot_id,
diff --git a/tests/integration/test_writes/test_writes.py
b/tests/integration/test_writes/test_writes.py
index ce5b1474..fc2746c6 100644
--- a/tests/integration/test_writes/test_writes.py
+++ b/tests/integration/test_writes/test_writes.py
@@ -18,7 +18,7 @@
import math
import os
import time
-from datetime import date, datetime
+from datetime import date, datetime, timedelta
from pathlib import Path
from typing import Any, Dict
from urllib.parse import urlparse
@@ -26,6 +26,7 @@ from urllib.parse import urlparse
import numpy as np
import pandas as pd
import pyarrow as pa
+import pyarrow.compute as pc
import pyarrow.parquet as pq
import pytest
import pytz
@@ -39,12 +40,12 @@ from pyiceberg.catalog.hive import HiveCatalog
from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.exceptions import NoSuchTableError
-from pyiceberg.expressions import GreaterThanOrEqual, In, Not
+from pyiceberg.expressions import And, EqualTo, GreaterThanOrEqual, In,
LessThan, Not
from pyiceberg.io.pyarrow import _dataframe_to_data_files
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import TableProperties
-from pyiceberg.transforms import DayTransform, IdentityTransform
+from pyiceberg.transforms import DayTransform, HourTransform, IdentityTransform
from pyiceberg.types import (
DateType,
DoubleType,
@@ -1344,18 +1345,7 @@ def test_overwrite_all_data_with_filter(session_catalog:
Catalog) -> None:
@pytest.mark.integration
-def test_delete_threshold() -> None:
- catalog = load_catalog(
- "local",
- **{
- "type": "rest",
- "uri": "http://localhost:8181",
- "s3.endpoint": "http://localhost:9000",
- "s3.access-key-id": "admin",
- "s3.secret-access-key": "password",
- },
- )
-
+def test_delete_threshold(session_catalog: Catalog) -> None:
schema = Schema(
NestedField(field_id=101, name="id", field_type=LongType(),
required=True),
NestedField(field_id=103, name="created_at", field_type=DateType(),
required=False),
@@ -1365,13 +1355,13 @@ def test_delete_threshold() -> None:
partition_spec = PartitionSpec(PartitionField(source_id=103,
field_id=2000, transform=DayTransform(), name="created_at_day"))
try:
- catalog.drop_table(
+ session_catalog.drop_table(
identifier="default.scores",
)
except NoSuchTableError:
pass
- catalog.create_table(
+ session_catalog.create_table(
identifier="default.scores",
schema=schema,
partition_spec=partition_spec,
@@ -1395,7 +1385,7 @@ def test_delete_threshold() -> None:
# Create the dataframe
df = pd.DataFrame({"id": id_column, "created_at": created_at_column,
"relevancy_score": relevancy_score_column})
- iceberg_table = catalog.load_table("default.scores")
+ iceberg_table = session_catalog.load_table("default.scores")
# Convert the pandas DataFrame to a PyArrow Table with the Iceberg schema
arrow_schema = iceberg_table.schema().as_arrow()
@@ -1409,3 +1399,52 @@ def test_delete_threshold() -> None:
assert
len(iceberg_table.scan(row_filter=Not(delete_condition)).to_arrow()) ==
lower_before
iceberg_table.delete(delete_condition)
assert len(iceberg_table.scan().to_arrow()) == lower_before
+
+
[email protected]
+def test_rewrite_manifest_after_partition_evolution(session_catalog: Catalog)
-> None:
+ np.random.seed(876)
+ N = 1440
+ d = {
+ "timestamp": pa.array([datetime(2023, 1, 1, 0, 0, 0) +
timedelta(minutes=i) for i in range(N)]),
+ "category": pa.array([np.random.choice(["A", "B", "C"]) for _ in
range(N)]),
+ "value": pa.array(np.random.normal(size=N)),
+ }
+ data = pa.Table.from_pydict(d)
+
+ try:
+ session_catalog.drop_table(
+ identifier="default.test_error_table",
+ )
+ except NoSuchTableError:
+ pass
+
+ table = session_catalog.create_table(
+ "default.test_error_table",
+ schema=data.schema,
+ )
+
+ with table.update_spec() as update:
+ update.add_field("timestamp", transform=HourTransform())
+
+ table.append(data)
+
+ with table.update_spec() as update:
+ update.add_field("category", transform=IdentityTransform())
+
+ data_ = data.filter(
+ (pc.field("category") == "A")
+ & (pc.field("timestamp") >= datetime(2023, 1, 1, 0))
+ & (pc.field("timestamp") < datetime(2023, 1, 1, 1))
+ )
+
+ table.overwrite(
+ df=data_,
+ overwrite_filter=And(
+ And(
+ GreaterThanOrEqual("timestamp", datetime(2023, 1, 1,
0).isoformat()),
+ LessThan("timestamp", datetime(2023, 1, 1, 1).isoformat()),
+ ),
+ EqualTo("category", "A"),
+ ),
+ )