This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 3eecdadc Add close option to Catalog (#2390)
3eecdadc is described below

commit 3eecdadc000047ec30749fc5d6ce1f2f072a30b2
Author: Kristofer Gaudel <68076186+kris-gau...@users.noreply.github.com>
AuthorDate: Thu Aug 28 03:59:27 2025 -0400

    Add close option to Catalog (#2390)
---
 pyiceberg/catalog/__init__.py | 28 ++++++++++++++++++++++
 pyiceberg/catalog/sql.py      | 11 +++++++++
 tests/catalog/test_base.py    | 23 +++++++++++++++++-
 tests/catalog/test_sql.py     | 55 ++++++++++++++++++++++++++++++++++++++++++-
 4 files changed, 115 insertions(+), 2 deletions(-)

diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py
index a143ad09..44d8c6ec 100644
--- a/pyiceberg/catalog/__init__.py
+++ b/pyiceberg/catalog/__init__.py
@@ -26,6 +26,7 @@ from dataclasses import dataclass
 from enum import Enum
 from typing import (
     TYPE_CHECKING,
+    Any,
     Callable,
     Dict,
     List,
@@ -793,6 +794,33 @@ class Catalog(ABC):
             
removed_previous_metadata_files.difference_update(current_metadata_files)
             delete_files(io, removed_previous_metadata_files, METADATA)
 
+    def close(self) -> None:  # noqa: B027
+        """Close the catalog and release any resources.
+
+        This method should be called when the catalog is no longer needed to 
ensure
+        proper cleanup of resources like database connections, file handles, 
etc.
+
+        Default implementation does nothing. Override in subclasses that need 
cleanup.
+        """
+
+    def __enter__(self) -> "Catalog":
+        """Enter the context manager.
+
+        Returns:
+            Catalog: The catalog instance.
+        """
+        return self
+
+    def __exit__(self, exc_type: Optional[type], exc_val: 
Optional[BaseException], exc_tb: Optional[Any]) -> None:
+        """Exit the context manager and close the catalog.
+
+        Args:
+            exc_type: Exception type if an exception occurred.
+            exc_val: Exception value if an exception occurred.
+            exc_tb: Exception traceback if an exception occurred.
+        """
+        self.close()
+
     def __repr__(self) -> str:
         """Return the string representation of the Catalog class."""
         return f"{self.name} ({self.__class__})"
diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py
index 0167b5a1..c0746dc9 100644
--- a/pyiceberg/catalog/sql.py
+++ b/pyiceberg/catalog/sql.py
@@ -733,3 +733,14 @@ class SqlCatalog(MetastoreCatalog):
 
     def drop_view(self, identifier: Union[str, Identifier]) -> None:
         raise NotImplementedError
+
+    def close(self) -> None:
+        """Close the catalog and release database connections.
+
+        This method closes the SQLAlchemy engine and disposes of all 
connection pools.
+        This ensures that any cached connections are properly closed, which is 
especially
+        important for blobfuse scenarios where file handles need to be closed 
for
+        data to be flushed to persistent storage.
+        """
+        if hasattr(self, "engine"):
+            self.engine.dispose()
diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py
index 01fea25d..80c01f70 100644
--- a/tests/catalog/test_base.py
+++ b/tests/catalog/test_base.py
@@ -48,7 +48,7 @@ from pyiceberg.table.update import (
 )
 from pyiceberg.transforms import IdentityTransform
 from pyiceberg.typedef import EMPTY_DICT, Properties
-from pyiceberg.types import IntegerType, LongType, NestedField
+from pyiceberg.types import IntegerType, LongType, NestedField, StringType
 
 
 @pytest.fixture
@@ -631,3 +631,24 @@ def 
test_table_metadata_writes_reflect_latest_path(catalog: InMemoryCatalog) ->
     table.transaction().set_properties({TableProperties.WRITE_METADATA_PATH: 
new_metadata_path}).commit_transaction()
 
     assert table.location_provider().new_metadata_location("metadata.json") == 
f"{new_metadata_path}/metadata.json"
+
+
+class TestCatalogClose:
+    """Test catalog close functionality."""
+
+    def test_in_memory_catalog_close(self, catalog: InMemoryCatalog) -> None:
+        """Test that InMemoryCatalog close method works."""
+        # Should not raise any exception
+        catalog.close()
+
+    def test_in_memory_catalog_context_manager(self, catalog: InMemoryCatalog) 
-> None:
+        """Test that InMemoryCatalog works as a context manager."""
+        with InMemoryCatalog("test") as cat:
+            assert cat.name == "test"
+            # Create a namespace and table to test functionality
+            cat.create_namespace("test_db")
+            schema = Schema(NestedField(1, "name", StringType(), 
required=True))
+            cat.create_table(("test_db", "test_table"), schema)
+
+        # InMemoryCatalog inherits close from SqlCatalog, so engine should be 
disposed
+        assert hasattr(cat, "engine")
diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py
index 23595148..27105e80 100644
--- a/tests/catalog/test_sql.py
+++ b/tests/catalog/test_sql.py
@@ -60,7 +60,7 @@ from pyiceberg.table.sorting import (
 )
 from pyiceberg.transforms import IdentityTransform
 from pyiceberg.typedef import Identifier
-from pyiceberg.types import IntegerType, strtobool
+from pyiceberg.types import IntegerType, NestedField, StringType, strtobool
 
 CATALOG_TABLES = [c.__tablename__ for c in 
SqlCatalogBaseTable.__subclasses__()]
 
@@ -1704,3 +1704,56 @@ def test_delete_metadata_multiple(catalog: SqlCatalog, 
table_schema_nested: Sche
     assert not os.path.exists(original_metadata_location[len("file://") :])
     assert not os.path.exists(updated_metadata_1.metadata_file[len("file://") 
:])
     assert os.path.exists(updated_metadata_2.metadata_file[len("file://") :])
+
+
+class TestSqlCatalogClose:
+    """Test SqlCatalog close functionality."""
+
+    def test_sql_catalog_close(self, catalog_sqlite: SqlCatalog) -> None:
+        """Test that SqlCatalog close method properly disposes the engine."""
+        # Verify engine exists
+        assert hasattr(catalog_sqlite, "engine")
+
+        # Close the catalog
+        catalog_sqlite.close()
+
+        # Verify engine is disposed by checking that the engine still exists
+        assert hasattr(catalog_sqlite, "engine")
+
+    def test_sql_catalog_context_manager(self, warehouse: Path) -> None:
+        """Test that SqlCatalog works as a context manager."""
+        with SqlCatalog("test", uri="sqlite:///:memory:", 
warehouse=str(warehouse)) as catalog:
+            # Verify engine exists
+            assert hasattr(catalog, "engine")
+
+            # Create a namespace and table to test functionality
+            catalog.create_namespace("test_db")
+            schema = Schema(NestedField(1, "name", StringType(), 
required=True))
+            catalog.create_table(("test_db", "test_table"), schema)
+
+        # Verify engine is disposed after exiting context
+        assert hasattr(catalog, "engine")
+
+    def test_sql_catalog_context_manager_with_exception(self) -> None:
+        """Test that SqlCatalog context manager properly closes even with 
exceptions."""
+        catalog = None
+        try:
+            with SqlCatalog("test", uri="sqlite:///:memory:") as cat:
+                catalog = cat
+                # Verify engine exists
+                assert hasattr(catalog, "engine")
+                raise ValueError("Test exception")
+        except ValueError:
+            pass
+
+        # Verify engine is disposed even after exception
+        assert catalog is not None
+        assert hasattr(catalog, "engine")
+
+    def test_sql_catalog_multiple_close_calls(self, catalog_sqlite: 
SqlCatalog) -> None:
+        """Test that multiple close calls on SqlCatalog are safe."""
+        # First close
+        catalog_sqlite.close()
+
+        # Second close should not raise an exception
+        catalog_sqlite.close()

Reply via email to