This is an automated email from the ASF dual-hosted git repository.
kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new fba79ba4 abort the whole table transaction if any updates in the
transaction has failed (#1246)
fba79ba4 is described below
commit fba79ba441aae887d7afa96bb16792ed7b959ccf
Author: stevie9868 <[email protected]>
AuthorDate: Tue Oct 29 10:03:19 2024 -0700
abort the whole table transaction if any updates in the transaction has
failed (#1246)
* abort the whole transaction if any update on the chain has failed
* Update tests/integration/test_writes/test_writes.py
Co-authored-by: Kevin Liu <[email protected]>
* Update tests/integration/test_writes/test_writes.py
Co-authored-by: Kevin Liu <[email protected]>
* add type:ignore to prevent lint error
---------
Co-authored-by: Yingjian Wu <[email protected]>
Co-authored-by: Kevin Liu <[email protected]>
---
pyiceberg/table/__init__.py | 11 ++++++++---
tests/integration/test_writes/test_writes.py | 24 ++++++++++++++++++++++++
2 files changed, 32 insertions(+), 3 deletions(-)
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index f94d9c8a..264afd89 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -23,6 +23,7 @@ from abc import ABC, abstractmethod
from dataclasses import dataclass
from functools import cached_property
from itertools import chain
+from types import TracebackType
from typing import (
TYPE_CHECKING,
Any,
@@ -33,6 +34,7 @@ from typing import (
Optional,
Set,
Tuple,
+ Type,
TypeVar,
Union,
)
@@ -237,9 +239,12 @@ class Transaction:
"""Start a transaction to update the table."""
return self
- def __exit__(self, _: Any, value: Any, traceback: Any) -> None:
- """Close and commit the transaction."""
- self.commit_transaction()
+ def __exit__(
+ self, exctype: Optional[Type[BaseException]], excinst:
Optional[BaseException], exctb: Optional[TracebackType]
+ ) -> None:
+ """Close and commit the transaction if no exceptions have been
raised."""
+ if exctype is None and excinst is None and exctb is None:
+ self.commit_transaction()
def _apply(self, updates: Tuple[TableUpdate, ...], requirements:
Tuple[TableRequirement, ...] = ()) -> Transaction:
"""Check if the requirements are met, and applies the updates to the
metadata."""
diff --git a/tests/integration/test_writes/test_writes.py
b/tests/integration/test_writes/test_writes.py
index fc2746c6..9cccb542 100644
--- a/tests/integration/test_writes/test_writes.py
+++ b/tests/integration/test_writes/test_writes.py
@@ -1448,3 +1448,27 @@ def
test_rewrite_manifest_after_partition_evolution(session_catalog: Catalog) ->
EqualTo("category", "A"),
),
)
+
+
[email protected]
[email protected]("format_version", [1, 2])
+def test_abort_table_transaction_on_exception(
+ spark: SparkSession, session_catalog: Catalog, arrow_table_with_null:
pa.Table, format_version: int
+) -> None:
+ identifier = "default.table_test_abort_table_transaction_on_exception"
+ tbl = _create_table(session_catalog, identifier,
properties={"format-version": format_version})
+
+ # Pre-populate some data
+ tbl.append(arrow_table_with_null)
+ table_size = len(arrow_table_with_null)
+ assert len(tbl.scan().to_pandas()) == table_size
+
+ # try to commit a transaction that raises exception at the middle
+ with pytest.raises(ValueError):
+ with tbl.transaction() as txn:
+ txn.append(arrow_table_with_null)
+ raise ValueError
+ txn.append(arrow_table_with_null) # type: ignore
+
+ # Validate the transaction is aborted and no partial update is applied
+ assert len(tbl.scan().to_pandas()) == table_size # type: ignore