This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 2d8397ea feat: Add `snapshot_properties` to upsert operation (#2829)
2d8397ea is described below
commit 2d8397ea2a7c99cf80528bae644b96e3aceb43cf
Author: Somasundaram Sekar <[email protected]>
AuthorDate: Mon Jan 5 10:52:21 2026 +0100
feat: Add `snapshot_properties` to upsert operation (#2829)
## Summary
- Added `snapshot_properties` parameter to `Transaction.upsert()` and
`Table.upsert()` methods
- The properties are passed to both underlying `overwrite()` and
`append()` operations, so they are applied to all snapshots created by
the upsert
- Added test to verify snapshot properties are correctly applied
## Background
Currently, the `upsert()` operation doesn't support
`snapshot_properties`, while other operations like `append()`,
`overwrite()`, and `delete()` do.
Since upsert creates multiple snapshots (one from `overwrite()` for
updates and one from `append()` for inserts), the `snapshot_properties`
are applied to all of them, which is consistent with how `overwrite()`
handles properties internally.
Closes #2659
## Test plan
- [x] Added `test_upsert_snapshot_properties` test that verifies
properties are applied to all snapshots created by upsert
- [x] All existing upsert tests pass (22 tests)
- [x] All lint checks pass
Co-authored-by: Somasundaram Sekar <[email protected]>
---
pyiceberg/table/__init__.py | 8 ++++++-
tests/table/test_upsert.py | 51 +++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 58 insertions(+), 1 deletion(-)
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 2e26a4cc..88a7bd00 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -734,6 +734,7 @@ class Transaction:
when_not_matched_insert_all: bool = True,
case_sensitive: bool = True,
branch: str | None = MAIN_BRANCH,
+ snapshot_properties: dict[str, str] = EMPTY_DICT,
) -> UpsertResult:
"""Shorthand API for performing an upsert to an iceberg table.
@@ -745,6 +746,7 @@ class Transaction:
when_not_matched_insert_all: Bool indicating new rows to be
inserted that do not match any existing rows in the table
case_sensitive: Bool indicating if the match should be
case-sensitive
branch: Branch Reference to run the upsert operation
+ snapshot_properties: Custom properties to be added to the snapshot
summary
To learn more about the identifier-field-ids:
https://iceberg.apache.org/spec/#identifier-field-ids
@@ -861,12 +863,13 @@ class Transaction:
rows_to_update,
overwrite_filter=Or(*overwrite_predicates) if
len(overwrite_predicates) > 1 else overwrite_predicates[0],
branch=branch,
+ snapshot_properties=snapshot_properties,
)
if when_not_matched_insert_all:
insert_row_cnt = len(rows_to_insert)
if rows_to_insert:
- self.append(rows_to_insert, branch=branch)
+ self.append(rows_to_insert, branch=branch,
snapshot_properties=snapshot_properties)
return UpsertResult(rows_updated=update_row_cnt,
rows_inserted=insert_row_cnt)
@@ -1327,6 +1330,7 @@ class Table:
when_not_matched_insert_all: bool = True,
case_sensitive: bool = True,
branch: str | None = MAIN_BRANCH,
+ snapshot_properties: dict[str, str] = EMPTY_DICT,
) -> UpsertResult:
"""Shorthand API for performing an upsert to an iceberg table.
@@ -1338,6 +1342,7 @@ class Table:
when_not_matched_insert_all: Bool indicating new rows to be
inserted that do not match any existing rows in the table
case_sensitive: Bool indicating if the match should be
case-sensitive
branch: Branch Reference to run the upsert operation
+ snapshot_properties: Custom properties to be added to the snapshot
summary
To learn more about the identifier-field-ids:
https://iceberg.apache.org/spec/#identifier-field-ids
@@ -1371,6 +1376,7 @@ class Table:
when_not_matched_insert_all=when_not_matched_insert_all,
case_sensitive=case_sensitive,
branch=branch,
+ snapshot_properties=snapshot_properties,
)
def append(self, df: pa.Table, snapshot_properties: dict[str, str] =
EMPTY_DICT, branch: str | None = MAIN_BRANCH) -> None:
diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py
index 891d4bba..35a3a119 100644
--- a/tests/table/test_upsert.py
+++ b/tests/table/test_upsert.py
@@ -834,3 +834,54 @@ def test_stage_only_upsert(catalog: Catalog) -> None:
assert operations == ["append", "append", "append"]
# both subsequent parent id should be the first snapshot id
assert parent_snapshot_id == [None, current_snapshot, current_snapshot]
+
+
+def test_upsert_snapshot_properties(catalog: Catalog) -> None:
+ """Test that snapshot_properties are applied to snapshots created by
upsert."""
+ identifier = "default.test_upsert_snapshot_properties"
+ _drop_table(catalog, identifier)
+
+ schema = Schema(
+ NestedField(1, "city", StringType(), required=True),
+ NestedField(2, "population", IntegerType(), required=True),
+ identifier_field_ids=[1],
+ )
+
+ tbl = catalog.create_table(identifier, schema=schema)
+ arrow_schema = pa.schema(
+ [
+ pa.field("city", pa.string(), nullable=False),
+ pa.field("population", pa.int32(), nullable=False),
+ ]
+ )
+
+ # Initial data
+ df = pa.Table.from_pylist(
+ [{"city": "Amsterdam", "population": 921402}],
+ schema=arrow_schema,
+ )
+ tbl.append(df)
+ initial_snapshot_count = len(list(tbl.snapshots()))
+
+ # Upsert with snapshot_properties (both update and insert)
+ df = pa.Table.from_pylist(
+ [
+ {"city": "Amsterdam", "population": 950000}, # Update
+ {"city": "Berlin", "population": 3432000}, # Insert
+ ],
+ schema=arrow_schema,
+ )
+ result = tbl.upsert(df, snapshot_properties={"test_prop": "test_value"})
+
+ assert result.rows_updated == 1
+ assert result.rows_inserted == 1
+
+ # Verify properties are on the snapshots created by upsert
+ snapshots = list(tbl.snapshots())
+ # Upsert should have created additional snapshots (overwrite + append)
+ assert len(snapshots) > initial_snapshot_count
+
+ # Check that all new snapshots have the snapshot_properties
+ for snapshot in snapshots[initial_snapshot_count:]:
+ assert snapshot.summary is not None
+ assert snapshot.summary.additional_properties.get("test_prop") ==
"test_value"