This is an automated email from the ASF dual-hosted git repository.
kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 055938d3 Support loading custom catalog impl (#947)
055938d3 is described below
commit 055938d36d46efb94849b1d861cd8a8a111f6ae5
Author: Jack Ye <[email protected]>
AuthorDate: Thu Jul 25 11:30:36 2024 -0700
Support loading custom catalog impl (#947)
* Support loading custom catalog impl
* add more tests
* fix lint
* fix typo
* fix typo 2
---
mkdocs/docs/configuration.md | 20 +++++++++++++++++++-
pyiceberg/catalog/__init__.py | 29 +++++++++++++++++++++++++++++
tests/catalog/test_base.py | 30 +++++++++++++++++++++++++-----
tests/catalog/test_sql.py | 15 +++++++++++++++
4 files changed, 88 insertions(+), 6 deletions(-)
diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md
index 985b8a77..ff374165 100644
--- a/mkdocs/docs/configuration.md
+++ b/mkdocs/docs/configuration.md
@@ -139,7 +139,13 @@ For the FileIO there are several configuration options
available:
## Catalogs
-PyIceberg currently has native support for REST, SQL, Hive, Glue and DynamoDB.
+PyIceberg currently has native catalog type support for REST, SQL, Hive, Glue
and DynamoDB.
+Alternatively, you can also directly set the catalog implementation:
+
+| Key | Example | Description
|
+| --------------- | ---------------------------- |
------------------------------------------------------------------------------------------------
|
+| type | rest | Type of catalog, one of
`rest`, `sql`, `hive`, `glue`, `dymamodb`. Default to `rest` |
+| py-catalog-impl | mypackage.mymodule.MyCatalog | Sets the catalog explicitly
to an implementation, and will fail explicitly if it can't be loaded |
There are three ways to pass in configuration:
@@ -379,6 +385,18 @@ catalog:
<!-- prettier-ignore-end -->
+### Custom Catalog Implementations
+
+If you want to load any custom catalog implementation, you can set catalog
configurations like the following:
+
+```yaml
+catalog:
+ default:
+ py-catalog-impl: mypackage.mymodule.MyCatalog
+ custom-key1: value1
+ custom-key2: value2
+```
+
## Unified AWS Credentials
You can explicitly set the AWS credentials for both Glue/DynamoDB Catalog and
S3 FileIO by configuring `client.*` properties. For example:
diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py
index a84bde0d..35f0a5b1 100644
--- a/pyiceberg/catalog/__init__.py
+++ b/pyiceberg/catalog/__init__.py
@@ -17,6 +17,7 @@
from __future__ import annotations
+import importlib
import logging
import re
import uuid
@@ -77,6 +78,7 @@ _ENV_CONFIG = Config()
TOKEN = "token"
TYPE = "type"
+PY_CATALOG_IMPL = "py-catalog-impl"
ICEBERG = "iceberg"
TABLE_TYPE = "table_type"
WAREHOUSE_LOCATION = "warehouse"
@@ -233,6 +235,19 @@ def load_catalog(name: Optional[str] = None, **properties:
Optional[str]) -> Cat
catalog_type: Optional[CatalogType]
provided_catalog_type = conf.get(TYPE)
+ if catalog_impl := properties.get(PY_CATALOG_IMPL):
+ if provided_catalog_type:
+ raise ValueError(
+ "Must not set both catalog type and py-catalog-impl
configurations, "
+ f"but found type {provided_catalog_type} and py-catalog-impl
{catalog_impl}"
+ )
+
+ if catalog := _import_catalog(name, catalog_impl, properties):
+ logger.info("Loaded Catalog: %s", catalog_impl)
+ return catalog
+ else:
+ raise ValueError(f"Could not initialize Catalog: {catalog_impl}")
+
catalog_type = None
if provided_catalog_type and isinstance(provided_catalog_type, str):
catalog_type = CatalogType[provided_catalog_type.upper()]
@@ -283,6 +298,20 @@ def delete_data_files(io: FileIO, manifests_to_delete:
List[ManifestFile]) -> No
deleted_files[path] = True
+def _import_catalog(name: str, catalog_impl: str, properties: Properties) ->
Optional[Catalog]:
+ try:
+ path_parts = catalog_impl.split(".")
+ if len(path_parts) < 2:
+ raise ValueError(f"py-catalog-impl should be full path
(module.CustomCatalog), got: {catalog_impl}")
+ module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1]
+ module = importlib.import_module(module_name)
+ class_ = getattr(module, class_name)
+ return class_(name, **properties)
+ except ModuleNotFoundError:
+ logger.warning("Could not initialize Catalog: %s", catalog_impl)
+ return None
+
+
@dataclass
class PropertiesUpdateSummary:
removed: List[str]
diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py
index 06e9a8a3..2f53a6c2 100644
--- a/tests/catalog/test_base.py
+++ b/tests/catalog/test_base.py
@@ -32,11 +32,7 @@ import pytest
from pydantic_core import ValidationError
from pytest_lazyfixture import lazy_fixture
-from pyiceberg.catalog import (
- Catalog,
- MetastoreCatalog,
- PropertiesUpdateSummary,
-)
+from pyiceberg.catalog import Catalog, MetastoreCatalog,
PropertiesUpdateSummary, load_catalog
from pyiceberg.exceptions import (
NamespaceAlreadyExistsError,
NamespaceNotEmptyError,
@@ -295,6 +291,30 @@ def given_catalog_has_a_table(
)
+def test_load_catalog_impl_not_full_path() -> None:
+ with pytest.raises(ValueError) as exc_info:
+ load_catalog("catalog", **{"py-catalog-impl": "CustomCatalog"})
+
+ assert "py-catalog-impl should be full path (module.CustomCatalog), got:
CustomCatalog" in str(exc_info.value)
+
+
+def test_load_catalog_impl_does_not_exist() -> None:
+ with pytest.raises(ValueError) as exc_info:
+ load_catalog("catalog", **{"py-catalog-impl":
"pyiceberg.does.not.exist.Catalog"})
+
+ assert "Could not initialize Catalog: pyiceberg.does.not.exist.Catalog" in
str(exc_info.value)
+
+
+def test_load_catalog_has_type_and_impl() -> None:
+ with pytest.raises(ValueError) as exc_info:
+ load_catalog("catalog", **{"py-catalog-impl":
"pyiceberg.does.not.exist.Catalog", "type": "sql"})
+
+ assert (
+ "Must not set both catalog type and py-catalog-impl configurations, "
+ "but found type sql and py-catalog-impl
pyiceberg.does.not.exist.Catalog" in str(exc_info.value)
+ )
+
+
def test_namespace_from_tuple() -> None:
# Given
identifier = ("com", "organization", "department", "my_table")
diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py
index f887b1ea..7b48b08a 100644
--- a/tests/catalog/test_sql.py
+++ b/tests/catalog/test_sql.py
@@ -27,6 +27,7 @@ from sqlalchemy.exc import ArgumentError, IntegrityError
from pyiceberg.catalog import (
Catalog,
+ load_catalog,
)
from pyiceberg.catalog.sql import DEFAULT_ECHO_VALUE,
DEFAULT_POOL_PRE_PING_VALUE, SqlCatalog
from pyiceberg.exceptions import (
@@ -210,6 +211,20 @@ def
test_creation_with_pool_pre_ping_parameter(catalog_name: str, warehouse: Pat
)
+def test_creation_from_impl(catalog_name: str, warehouse: Path) -> None:
+ assert isinstance(
+ load_catalog(
+ catalog_name,
+ **{
+ "py-catalog-impl": "pyiceberg.catalog.sql.SqlCatalog",
+ "uri": f"sqlite:////{warehouse}/sql-catalog.db",
+ "warehouse": f"file://{warehouse}",
+ },
+ ),
+ SqlCatalog,
+ )
+
+
@pytest.mark.parametrize(
"catalog",
[