This is an automated email from the ASF dual-hosted git repository.

kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 055938d3 Support loading custom catalog impl (#947)
055938d3 is described below

commit 055938d36d46efb94849b1d861cd8a8a111f6ae5
Author: Jack Ye <[email protected]>
AuthorDate: Thu Jul 25 11:30:36 2024 -0700

    Support loading custom catalog impl (#947)
    
    * Support loading custom catalog impl
    
    * add more tests
    
    * fix lint
    
    * fix typo
    
    * fix typo 2
---
 mkdocs/docs/configuration.md  | 20 +++++++++++++++++++-
 pyiceberg/catalog/__init__.py | 29 +++++++++++++++++++++++++++++
 tests/catalog/test_base.py    | 30 +++++++++++++++++++++++++-----
 tests/catalog/test_sql.py     | 15 +++++++++++++++
 4 files changed, 88 insertions(+), 6 deletions(-)

diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md
index 985b8a77..ff374165 100644
--- a/mkdocs/docs/configuration.md
+++ b/mkdocs/docs/configuration.md
@@ -139,7 +139,13 @@ For the FileIO there are several configuration options 
available:
 
 ## Catalogs
 
-PyIceberg currently has native support for REST, SQL, Hive, Glue and DynamoDB.
+PyIceberg currently has native catalog type support for REST, SQL, Hive, Glue 
and DynamoDB.
+Alternatively, you can also directly set the catalog implementation:
+
+| Key             | Example                      | Description                 
                                                                     |
+| --------------- | ---------------------------- | 
------------------------------------------------------------------------------------------------
 |
+| type            | rest                         | Type of catalog, one of 
`rest`, `sql`, `hive`, `glue`, `dymamodb`. Default to `rest`             |
+| py-catalog-impl | mypackage.mymodule.MyCatalog | Sets the catalog explicitly 
to an implementation, and will fail explicitly if it can't be loaded |
 
 There are three ways to pass in configuration:
 
@@ -379,6 +385,18 @@ catalog:
 
 <!-- prettier-ignore-end -->
 
+### Custom Catalog Implementations
+
+If you want to load any custom catalog implementation, you can set catalog 
configurations like the following:
+
+```yaml
+catalog:
+  default:
+    py-catalog-impl: mypackage.mymodule.MyCatalog
+    custom-key1: value1
+    custom-key2: value2
+```
+
 ## Unified AWS Credentials
 
 You can explicitly set the AWS credentials for both Glue/DynamoDB Catalog and 
S3 FileIO by configuring `client.*` properties. For example:
diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py
index a84bde0d..35f0a5b1 100644
--- a/pyiceberg/catalog/__init__.py
+++ b/pyiceberg/catalog/__init__.py
@@ -17,6 +17,7 @@
 
 from __future__ import annotations
 
+import importlib
 import logging
 import re
 import uuid
@@ -77,6 +78,7 @@ _ENV_CONFIG = Config()
 
 TOKEN = "token"
 TYPE = "type"
+PY_CATALOG_IMPL = "py-catalog-impl"
 ICEBERG = "iceberg"
 TABLE_TYPE = "table_type"
 WAREHOUSE_LOCATION = "warehouse"
@@ -233,6 +235,19 @@ def load_catalog(name: Optional[str] = None, **properties: 
Optional[str]) -> Cat
     catalog_type: Optional[CatalogType]
     provided_catalog_type = conf.get(TYPE)
 
+    if catalog_impl := properties.get(PY_CATALOG_IMPL):
+        if provided_catalog_type:
+            raise ValueError(
+                "Must not set both catalog type and py-catalog-impl 
configurations, "
+                f"but found type {provided_catalog_type} and py-catalog-impl 
{catalog_impl}"
+            )
+
+        if catalog := _import_catalog(name, catalog_impl, properties):
+            logger.info("Loaded Catalog: %s", catalog_impl)
+            return catalog
+        else:
+            raise ValueError(f"Could not initialize Catalog: {catalog_impl}")
+
     catalog_type = None
     if provided_catalog_type and isinstance(provided_catalog_type, str):
         catalog_type = CatalogType[provided_catalog_type.upper()]
@@ -283,6 +298,20 @@ def delete_data_files(io: FileIO, manifests_to_delete: 
List[ManifestFile]) -> No
                 deleted_files[path] = True
 
 
+def _import_catalog(name: str, catalog_impl: str, properties: Properties) -> 
Optional[Catalog]:
+    try:
+        path_parts = catalog_impl.split(".")
+        if len(path_parts) < 2:
+            raise ValueError(f"py-catalog-impl should be full path 
(module.CustomCatalog), got: {catalog_impl}")
+        module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1]
+        module = importlib.import_module(module_name)
+        class_ = getattr(module, class_name)
+        return class_(name, **properties)
+    except ModuleNotFoundError:
+        logger.warning("Could not initialize Catalog: %s", catalog_impl)
+        return None
+
+
 @dataclass
 class PropertiesUpdateSummary:
     removed: List[str]
diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py
index 06e9a8a3..2f53a6c2 100644
--- a/tests/catalog/test_base.py
+++ b/tests/catalog/test_base.py
@@ -32,11 +32,7 @@ import pytest
 from pydantic_core import ValidationError
 from pytest_lazyfixture import lazy_fixture
 
-from pyiceberg.catalog import (
-    Catalog,
-    MetastoreCatalog,
-    PropertiesUpdateSummary,
-)
+from pyiceberg.catalog import Catalog, MetastoreCatalog, 
PropertiesUpdateSummary, load_catalog
 from pyiceberg.exceptions import (
     NamespaceAlreadyExistsError,
     NamespaceNotEmptyError,
@@ -295,6 +291,30 @@ def given_catalog_has_a_table(
     )
 
 
+def test_load_catalog_impl_not_full_path() -> None:
+    with pytest.raises(ValueError) as exc_info:
+        load_catalog("catalog", **{"py-catalog-impl": "CustomCatalog"})
+
+    assert "py-catalog-impl should be full path (module.CustomCatalog), got: 
CustomCatalog" in str(exc_info.value)
+
+
+def test_load_catalog_impl_does_not_exist() -> None:
+    with pytest.raises(ValueError) as exc_info:
+        load_catalog("catalog", **{"py-catalog-impl": 
"pyiceberg.does.not.exist.Catalog"})
+
+    assert "Could not initialize Catalog: pyiceberg.does.not.exist.Catalog" in 
str(exc_info.value)
+
+
+def test_load_catalog_has_type_and_impl() -> None:
+    with pytest.raises(ValueError) as exc_info:
+        load_catalog("catalog", **{"py-catalog-impl": 
"pyiceberg.does.not.exist.Catalog", "type": "sql"})
+
+    assert (
+        "Must not set both catalog type and py-catalog-impl configurations, "
+        "but found type sql and py-catalog-impl 
pyiceberg.does.not.exist.Catalog" in str(exc_info.value)
+    )
+
+
 def test_namespace_from_tuple() -> None:
     # Given
     identifier = ("com", "organization", "department", "my_table")
diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py
index f887b1ea..7b48b08a 100644
--- a/tests/catalog/test_sql.py
+++ b/tests/catalog/test_sql.py
@@ -27,6 +27,7 @@ from sqlalchemy.exc import ArgumentError, IntegrityError
 
 from pyiceberg.catalog import (
     Catalog,
+    load_catalog,
 )
 from pyiceberg.catalog.sql import DEFAULT_ECHO_VALUE, 
DEFAULT_POOL_PRE_PING_VALUE, SqlCatalog
 from pyiceberg.exceptions import (
@@ -210,6 +211,20 @@ def 
test_creation_with_pool_pre_ping_parameter(catalog_name: str, warehouse: Pat
         )
 
 
+def test_creation_from_impl(catalog_name: str, warehouse: Path) -> None:
+    assert isinstance(
+        load_catalog(
+            catalog_name,
+            **{
+                "py-catalog-impl": "pyiceberg.catalog.sql.SqlCatalog",
+                "uri": f"sqlite:////{warehouse}/sql-catalog.db",
+                "warehouse": f"file://{warehouse}",
+            },
+        ),
+        SqlCatalog,
+    )
+
+
 @pytest.mark.parametrize(
     "catalog",
     [

Reply via email to