This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 756ae625 Introduce hierarchical namespaces into SqlCatalog (#591)
756ae625 is described below
commit 756ae625a2ea0f9c12df78430512ce991f6a1976
Author: Eric L (CCCS) <[email protected]>
AuthorDate: Tue May 28 03:52:24 2024 -0400
Introduce hierarchical namespaces into SqlCatalog (#591)
* Introduce hierarchical namespaces into SqlCatalog
* Fix SqlCatalog unit tests broken from code update.
---
pyiceberg/catalog/__init__.py | 25 +-
pyiceberg/catalog/sql.py | 159 ++++----
pyiceberg/cli/console.py | 8 +-
tests/catalog/test_sql.py | 847 +++++++++++++++++++++++++++++++-----------
tests/conftest.py | 13 +
5 files changed, 758 insertions(+), 294 deletions(-)
diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py
index 0b70fe32..ea2bc657 100644
--- a/pyiceberg/catalog/__init__.py
+++ b/pyiceberg/catalog/__init__.py
@@ -588,7 +588,7 @@ class Catalog(ABC):
If the identifier is a string, it is split into a tuple on '.'. If it
is a tuple, it is used as-is.
Args:
- identifier (str | Identifier: an identifier, either a string or
tuple of strings.
+ identifier (str | Identifier): an identifier, either a string or
tuple of strings.
Returns:
Identifier: a tuple of strings.
@@ -619,6 +619,29 @@ class Catalog(ABC):
"""
return Catalog.identifier_to_tuple(identifier)[:-1]
+ @staticmethod
+ def namespace_to_string(
+ identifier: Union[str, Identifier], err: Union[Type[ValueError],
Type[NoSuchNamespaceError]] = ValueError
+ ) -> str:
+ """Transform a namespace identifier into a string.
+
+ Args:
+ identifier (Union[str, Identifier]): a namespace identifier.
+ err (Union[Type[ValueError], Type[NoSuchNamespaceError]]): the
error type to raise when identifier is empty.
+
+ Returns:
+ Identifier: Namespace identifier.
+ """
+ tuple_identifier = Catalog.identifier_to_tuple(identifier)
+ if len(tuple_identifier) < 1:
+ raise err("Empty namespace identifier")
+
+ # Check if any segment of the tuple is an empty string
+ if any(segment.strip() == "" for segment in tuple_identifier):
+ raise err("Namespace identifier contains an empty segment or a
segment with only whitespace")
+
+ return ".".join(segment.strip() for segment in tuple_identifier)
+
@staticmethod
def identifier_to_database(
identifier: Union[str, Identifier], err: Union[Type[ValueError],
Type[NoSuchNamespaceError]] = ValueError
diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py
index 978109b2..6c198767 100644
--- a/pyiceberg/catalog/sql.py
+++ b/pyiceberg/catalog/sql.py
@@ -43,6 +43,7 @@ from sqlalchemy.orm import (
from pyiceberg.catalog import (
METADATA_LOCATION,
+ Catalog,
MetastoreCatalog,
PropertiesUpdateSummary,
)
@@ -94,6 +95,16 @@ class IcebergNamespaceProperties(SqlCatalogBaseTable):
class SqlCatalog(MetastoreCatalog):
+ """Implementation of a SQL based catalog.
+
+ In the `JDBCCatalog` implementation, a `Namespace` is composed of a list
of strings separated by dots: `'ns1.ns2.ns3'`.
+ And you can have as many levels as you want, but you need at least one.
The `SqlCatalog` honors the same convention.
+
+ In the `JDBCCatalog` implementation, a `TableIdentifier` is composed of an
optional `Namespace` and a table name.
+ When a `Namespace` is present, the full name will be
`'ns1.ns2.ns3.table'`. A valid `TableIdentifier` could be `'name'` (no
namespace).
+ The `SqlCatalog` has a different convention where a `TableIdentifier`
requires a `Namespace`.
+ """
+
def __init__(self, name: str, **properties: str):
super().__init__(name, **properties)
@@ -136,7 +147,7 @@ class SqlCatalog(MetastoreCatalog):
file = io.new_input(metadata_location)
metadata = FromInputFile.table_metadata(file)
return Table(
- identifier=(self.name, table_namespace, table_name),
+ identifier=(self.name,) +
Catalog.identifier_to_tuple(table_namespace) + (table_name,),
metadata=metadata,
metadata_location=metadata_location,
io=self._load_file_io(metadata.properties, metadata_location),
@@ -173,11 +184,14 @@ class SqlCatalog(MetastoreCatalog):
"""
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore
- database_name, table_name =
self.identifier_to_database_and_table(identifier)
- if not self._namespace_exists(database_name):
- raise NoSuchNamespaceError(f"Namespace does not exist:
{database_name}")
+ identifier_nocatalog =
self.identifier_to_tuple_without_catalog(identifier)
+ namespace_identifier = Catalog.namespace_from(identifier_nocatalog)
+ table_name = Catalog.table_name_from(identifier_nocatalog)
+ if not self._namespace_exists(namespace_identifier):
+ raise NoSuchNamespaceError(f"Namespace does not exist:
{namespace_identifier}")
- location = self._resolve_table_location(location, database_name,
table_name)
+ namespace = Catalog.namespace_to_string(namespace_identifier)
+ location = self._resolve_table_location(location, namespace,
table_name)
metadata_location = self._get_metadata_location(location=location)
metadata = new_table_metadata(
location=location, schema=schema, partition_spec=partition_spec,
sort_order=sort_order, properties=properties
@@ -190,7 +204,7 @@ class SqlCatalog(MetastoreCatalog):
session.add(
IcebergTables(
catalog_name=self.name,
- table_namespace=database_name,
+ table_namespace=namespace,
table_name=table_name,
metadata_location=metadata_location,
previous_metadata_location=None,
@@ -198,7 +212,7 @@ class SqlCatalog(MetastoreCatalog):
)
session.commit()
except IntegrityError as e:
- raise TableAlreadyExistsError(f"Table
{database_name}.{table_name} already exists") from e
+ raise TableAlreadyExistsError(f"Table {namespace}.{table_name}
already exists") from e
return self.load_table(identifier=identifier)
@@ -216,16 +230,19 @@ class SqlCatalog(MetastoreCatalog):
TableAlreadyExistsError: If the table already exists
NoSuchNamespaceError: If namespace does not exist
"""
- database_name, table_name =
self.identifier_to_database_and_table(identifier)
- if not self._namespace_exists(database_name):
- raise NoSuchNamespaceError(f"Namespace does not exist:
{database_name}")
+ identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
+ namespace_tuple = Catalog.namespace_from(identifier_tuple)
+ namespace = Catalog.namespace_to_string(namespace_tuple)
+ table_name = Catalog.table_name_from(identifier_tuple)
+ if not self._namespace_exists(namespace):
+ raise NoSuchNamespaceError(f"Namespace does not exist:
{namespace}")
with Session(self.engine) as session:
try:
session.add(
IcebergTables(
catalog_name=self.name,
- table_namespace=database_name,
+ table_namespace=namespace,
table_name=table_name,
metadata_location=metadata_location,
previous_metadata_location=None,
@@ -233,7 +250,7 @@ class SqlCatalog(MetastoreCatalog):
)
session.commit()
except IntegrityError as e:
- raise TableAlreadyExistsError(f"Table
{database_name}.{table_name} already exists") from e
+ raise TableAlreadyExistsError(f"Table {namespace}.{table_name}
already exists") from e
return self.load_table(identifier=identifier)
@@ -253,17 +270,19 @@ class SqlCatalog(MetastoreCatalog):
NoSuchTableError: If a table with the name does not exist.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
- database_name, table_name =
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
+ namespace_tuple = Catalog.namespace_from(identifier_tuple)
+ namespace = Catalog.namespace_to_string(namespace_tuple)
+ table_name = Catalog.table_name_from(identifier_tuple)
with Session(self.engine) as session:
stmt = select(IcebergTables).where(
IcebergTables.catalog_name == self.name,
- IcebergTables.table_namespace == database_name,
+ IcebergTables.table_namespace == namespace,
IcebergTables.table_name == table_name,
)
result = session.scalar(stmt)
if result:
return self._convert_orm_to_iceberg(result)
- raise NoSuchTableError(f"Table does not exist:
{database_name}.{table_name}")
+ raise NoSuchTableError(f"Table does not exist:
{namespace}.{table_name}")
def drop_table(self, identifier: Union[str, Identifier]) -> None:
"""Drop a table.
@@ -275,18 +294,20 @@ class SqlCatalog(MetastoreCatalog):
NoSuchTableError: If a table with the name does not exist.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
- database_name, table_name =
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
+ namespace_tuple = Catalog.namespace_from(identifier_tuple)
+ namespace = Catalog.namespace_to_string(namespace_tuple)
+ table_name = Catalog.table_name_from(identifier_tuple)
with Session(self.engine) as session:
if self.engine.dialect.supports_sane_rowcount:
res = session.execute(
delete(IcebergTables).where(
IcebergTables.catalog_name == self.name,
- IcebergTables.table_namespace == database_name,
+ IcebergTables.table_namespace == namespace,
IcebergTables.table_name == table_name,
)
)
if res.rowcount < 1:
- raise NoSuchTableError(f"Table does not exist:
{database_name}.{table_name}")
+ raise NoSuchTableError(f"Table does not exist:
{namespace}.{table_name}")
else:
try:
tbl = (
@@ -294,14 +315,14 @@ class SqlCatalog(MetastoreCatalog):
.with_for_update(of=IcebergTables)
.filter(
IcebergTables.catalog_name == self.name,
- IcebergTables.table_namespace == database_name,
+ IcebergTables.table_namespace == namespace,
IcebergTables.table_name == table_name,
)
.one()
)
session.delete(tbl)
except NoResultFound as e:
- raise NoSuchTableError(f"Table does not exist:
{database_name}.{table_name}") from e
+ raise NoSuchTableError(f"Table does not exist:
{namespace}.{table_name}") from e
session.commit()
def rename_table(self, from_identifier: Union[str, Identifier],
to_identifier: Union[str, Identifier]) -> Table:
@@ -320,10 +341,15 @@ class SqlCatalog(MetastoreCatalog):
NoSuchNamespaceError: If the target namespace does not exist.
"""
from_identifier_tuple =
self.identifier_to_tuple_without_catalog(from_identifier)
- from_database_name, from_table_name =
self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
- to_database_name, to_table_name =
self.identifier_to_database_and_table(to_identifier)
- if not self._namespace_exists(to_database_name):
- raise NoSuchNamespaceError(f"Namespace does not exist:
{to_database_name}")
+ to_identifier_tuple =
self.identifier_to_tuple_without_catalog(to_identifier)
+ from_namespace_tuple = Catalog.namespace_from(from_identifier_tuple)
+ from_namespace = Catalog.namespace_to_string(from_namespace_tuple)
+ from_table_name = Catalog.table_name_from(from_identifier_tuple)
+ to_namespace_tuple = Catalog.namespace_from(to_identifier_tuple)
+ to_namespace = Catalog.namespace_to_string(to_namespace_tuple)
+ to_table_name = Catalog.table_name_from(to_identifier_tuple)
+ if not self._namespace_exists(to_namespace):
+ raise NoSuchNamespaceError(f"Namespace does not exist:
{to_namespace}")
with Session(self.engine) as session:
try:
if self.engine.dialect.supports_sane_rowcount:
@@ -331,10 +357,10 @@ class SqlCatalog(MetastoreCatalog):
update(IcebergTables)
.where(
IcebergTables.catalog_name == self.name,
- IcebergTables.table_namespace ==
from_database_name,
+ IcebergTables.table_namespace == from_namespace,
IcebergTables.table_name == from_table_name,
)
- .values(table_namespace=to_database_name,
table_name=to_table_name)
+ .values(table_namespace=to_namespace,
table_name=to_table_name)
)
result = session.execute(stmt)
if result.rowcount < 1:
@@ -346,18 +372,18 @@ class SqlCatalog(MetastoreCatalog):
.with_for_update(of=IcebergTables)
.filter(
IcebergTables.catalog_name == self.name,
- IcebergTables.table_namespace ==
from_database_name,
+ IcebergTables.table_namespace ==
from_namespace,
IcebergTables.table_name == from_table_name,
)
.one()
)
- tbl.table_namespace = to_database_name
+ tbl.table_namespace = to_namespace
tbl.table_name = to_table_name
except NoResultFound as e:
raise NoSuchTableError(f"Table does not exist:
{from_table_name}") from e
session.commit()
except IntegrityError as e:
- raise TableAlreadyExistsError(f"Table
{to_database_name}.{to_table_name} already exists") from e
+ raise TableAlreadyExistsError(f"Table
{to_namespace}.{to_table_name} already exists") from e
return self.load_table(to_identifier)
def _commit_table(self, table_request: CommitTableRequest) ->
CommitTableResponse:
@@ -377,7 +403,9 @@ class SqlCatalog(MetastoreCatalog):
tuple(table_request.identifier.namespace.root +
[table_request.identifier.name])
)
current_table = self.load_table(identifier_tuple)
- database_name, table_name =
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
+ namespace_tuple = Catalog.namespace_from(identifier_tuple)
+ namespace = Catalog.namespace_to_string(namespace_tuple)
+ table_name = Catalog.table_name_from(identifier_tuple)
base_metadata = current_table.metadata
for requirement in table_request.requirements:
requirement.validate(base_metadata)
@@ -398,7 +426,7 @@ class SqlCatalog(MetastoreCatalog):
update(IcebergTables)
.where(
IcebergTables.catalog_name == self.name,
- IcebergTables.table_namespace == database_name,
+ IcebergTables.table_namespace == namespace,
IcebergTables.table_name == table_name,
IcebergTables.metadata_location ==
current_table.metadata_location,
)
@@ -406,7 +434,7 @@ class SqlCatalog(MetastoreCatalog):
)
result = session.execute(stmt)
if result.rowcount < 1:
- raise CommitFailedException(f"Table has been updated by
another process: {database_name}.{table_name}")
+ raise CommitFailedException(f"Table has been updated by
another process: {namespace}.{table_name}")
else:
try:
tbl = (
@@ -414,7 +442,7 @@ class SqlCatalog(MetastoreCatalog):
.with_for_update(of=IcebergTables)
.filter(
IcebergTables.catalog_name == self.name,
- IcebergTables.table_namespace == database_name,
+ IcebergTables.table_namespace == namespace,
IcebergTables.table_name == table_name,
IcebergTables.metadata_location ==
current_table.metadata_location,
)
@@ -423,13 +451,14 @@ class SqlCatalog(MetastoreCatalog):
tbl.metadata_location = new_metadata_location
tbl.previous_metadata_location =
current_table.metadata_location
except NoResultFound as e:
- raise CommitFailedException(f"Table has been updated by
another process: {database_name}.{table_name}") from e
+ raise CommitFailedException(f"Table has been updated by
another process: {namespace}.{table_name}") from e
session.commit()
return CommitTableResponse(metadata=updated_metadata,
metadata_location=new_metadata_location)
def _namespace_exists(self, identifier: Union[str, Identifier]) -> bool:
- namespace = self.identifier_to_database(identifier)
+ namespace_tuple = Catalog.identifier_to_tuple(identifier)
+ namespace = Catalog.namespace_to_string(namespace_tuple,
NoSuchNamespaceError)
with Session(self.engine) as session:
stmt = (
select(IcebergTables)
@@ -462,18 +491,20 @@ class SqlCatalog(MetastoreCatalog):
Raises:
NamespaceAlreadyExistsError: If a namespace with the given name
already exists.
"""
+ if self._namespace_exists(namespace):
+ raise NamespaceAlreadyExistsError(f"Namespace {namespace} already
exists")
+
if not properties:
properties =
IcebergNamespaceProperties.NAMESPACE_MINIMAL_PROPERTIES
- database_name = self.identifier_to_database(namespace)
- if self._namespace_exists(database_name):
- raise NamespaceAlreadyExistsError(f"Database {database_name}
already exists")
-
create_properties = properties if properties else
IcebergNamespaceProperties.NAMESPACE_MINIMAL_PROPERTIES
with Session(self.engine) as session:
for key, value in create_properties.items():
session.add(
IcebergNamespaceProperties(
- catalog_name=self.name, namespace=database_name,
property_key=key, property_value=value
+ catalog_name=self.name,
+ namespace=Catalog.namespace_to_string(namespace,
NoSuchNamespaceError),
+ property_key=key,
+ property_value=value,
)
)
session.commit()
@@ -488,16 +519,16 @@ class SqlCatalog(MetastoreCatalog):
NoSuchNamespaceError: If a namespace with the given name does not
exist.
NamespaceNotEmptyError: If the namespace is not empty.
"""
- database_name = self.identifier_to_database(namespace,
NoSuchNamespaceError)
- if self._namespace_exists(database_name):
- if tables := self.list_tables(database_name):
- raise NamespaceNotEmptyError(f"Database {database_name} is not
empty. {len(tables)} tables exist.")
+ if self._namespace_exists(namespace):
+ namespace_str = Catalog.namespace_to_string(namespace)
+ if tables := self.list_tables(namespace):
+ raise NamespaceNotEmptyError(f"Namespace {namespace_str} is
not empty. {len(tables)} tables exist.")
with Session(self.engine) as session:
session.execute(
delete(IcebergNamespaceProperties).where(
IcebergNamespaceProperties.catalog_name == self.name,
- IcebergNamespaceProperties.namespace == database_name,
+ IcebergNamespaceProperties.namespace == namespace_str,
)
)
session.commit()
@@ -516,14 +547,14 @@ class SqlCatalog(MetastoreCatalog):
Raises:
NoSuchNamespaceError: If a namespace with the given name does not
exist.
"""
- database_name = self.identifier_to_database(namespace,
NoSuchNamespaceError)
+ if namespace and not self._namespace_exists(namespace):
+ raise NoSuchNamespaceError(f"Namespace does not exist:
{namespace}")
- stmt = select(IcebergTables).where(
- IcebergTables.catalog_name == self.name,
IcebergTables.table_namespace == database_name
- )
+ namespace = Catalog.namespace_to_string(namespace)
+ stmt = select(IcebergTables).where(IcebergTables.catalog_name ==
self.name, IcebergTables.table_namespace == namespace)
with Session(self.engine) as session:
result = session.scalars(stmt)
- return [(table.table_namespace, table.table_name) for table in
result]
+ return [(Catalog.identifier_to_tuple(table.table_namespace) +
(table.table_name,)) for table in result]
def list_namespaces(self, namespace: Union[str, Identifier] = ()) ->
List[Identifier]:
"""List namespaces from the given namespace. If not given, list
top-level namespaces from the catalog.
@@ -543,15 +574,15 @@ class SqlCatalog(MetastoreCatalog):
table_stmt =
select(IcebergTables.table_namespace).where(IcebergTables.catalog_name ==
self.name)
namespace_stmt =
select(IcebergNamespaceProperties.namespace).where(IcebergNamespaceProperties.catalog_name
== self.name)
if namespace:
- database_name = self.identifier_to_database(namespace,
NoSuchNamespaceError)
- table_stmt =
table_stmt.where(IcebergTables.table_namespace.like(database_name))
- namespace_stmt =
namespace_stmt.where(IcebergNamespaceProperties.namespace.like(database_name))
+ namespace_str = Catalog.namespace_to_string(namespace,
NoSuchNamespaceError)
+ table_stmt =
table_stmt.where(IcebergTables.table_namespace.like(namespace_str))
+ namespace_stmt =
namespace_stmt.where(IcebergNamespaceProperties.namespace.like(namespace_str))
stmt = union(
table_stmt,
namespace_stmt,
)
with Session(self.engine) as session:
- return [self.identifier_to_tuple(namespace_col) for namespace_col
in session.execute(stmt).scalars()]
+ return [Catalog.identifier_to_tuple(namespace_col) for
namespace_col in session.execute(stmt).scalars()]
def load_namespace_properties(self, namespace: Union[str, Identifier]) ->
Properties:
"""Get properties for a namespace.
@@ -565,12 +596,12 @@ class SqlCatalog(MetastoreCatalog):
Raises:
NoSuchNamespaceError: If a namespace with the given name does not
exist.
"""
- database_name = self.identifier_to_database(namespace)
- if not self._namespace_exists(database_name):
- raise NoSuchNamespaceError(f"Database {database_name} does not
exists")
+ namespace_str = Catalog.namespace_to_string(namespace)
+ if not self._namespace_exists(namespace):
+ raise NoSuchNamespaceError(f"Namespace {namespace_str} does not
exists")
stmt = select(IcebergNamespaceProperties).where(
- IcebergNamespaceProperties.catalog_name == self.name,
IcebergNamespaceProperties.namespace == database_name
+ IcebergNamespaceProperties.catalog_name == self.name,
IcebergNamespaceProperties.namespace == namespace_str
)
with Session(self.engine) as session:
result = session.scalars(stmt)
@@ -590,9 +621,9 @@ class SqlCatalog(MetastoreCatalog):
NoSuchNamespaceError: If a namespace with the given name does not
exist.
ValueError: If removals and updates have overlapping keys.
"""
- database_name = self.identifier_to_database(namespace)
- if not self._namespace_exists(database_name):
- raise NoSuchNamespaceError(f"Database {database_name} does not
exists")
+ namespace_str = Catalog.namespace_to_string(namespace)
+ if not self._namespace_exists(namespace):
+ raise NoSuchNamespaceError(f"Namespace {namespace_str} does not
exists")
current_properties =
self.load_namespace_properties(namespace=namespace)
properties_update_summary = self._get_updated_props_and_update_summary(
@@ -603,7 +634,7 @@ class SqlCatalog(MetastoreCatalog):
if removals:
delete_stmt = delete(IcebergNamespaceProperties).where(
IcebergNamespaceProperties.catalog_name == self.name,
- IcebergNamespaceProperties.namespace == database_name,
+ IcebergNamespaceProperties.namespace == namespace_str,
IcebergNamespaceProperties.property_key.in_(removals),
)
session.execute(delete_stmt)
@@ -614,14 +645,14 @@ class SqlCatalog(MetastoreCatalog):
# This is not a problem since it runs in a single transaction
delete_stmt = delete(IcebergNamespaceProperties).where(
IcebergNamespaceProperties.catalog_name == self.name,
- IcebergNamespaceProperties.namespace == database_name,
+ IcebergNamespaceProperties.namespace == namespace_str,
IcebergNamespaceProperties.property_key.in_(set(updates.keys())),
)
session.execute(delete_stmt)
insert_stmt = insert(IcebergNamespaceProperties)
for property_key, property_value in updates.items():
insert_stmt = insert_stmt.values(
- catalog_name=self.name, namespace=database_name,
property_key=property_key, property_value=property_value
+ catalog_name=self.name, namespace=namespace_str,
property_key=property_key, property_value=property_value
)
session.execute(insert_stmt)
session.commit()
diff --git a/pyiceberg/cli/console.py b/pyiceberg/cli/console.py
index 0fbda109..d1833df0 100644
--- a/pyiceberg/cli/console.py
+++ b/pyiceberg/cli/console.py
@@ -112,9 +112,13 @@ def list(ctx: Context, parent: Optional[str]) -> None: #
pylint: disable=redefi
"""List tables or namespaces."""
catalog, output = _catalog_and_output(ctx)
- identifiers = catalog.list_namespaces(parent or ())
- if not identifiers and parent:
+ identifiers = []
+ if parent:
+ # Do we have tables under parent namespace?
identifiers = catalog.list_tables(parent)
+ if not identifiers:
+ # List hierarchical namespaces if parent, root namespaces otherwise.
+ identifiers = catalog.list_namespaces(parent or ())
output.identifiers(identifiers)
diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py
index efa7b746..285cfd9a 100644
--- a/tests/catalog/test_sql.py
+++ b/tests/catalog/test_sql.py
@@ -17,7 +17,7 @@
import os
from pathlib import Path
-from typing import Generator, List
+from typing import Any, Generator, List
import pyarrow as pa
import pytest
@@ -25,6 +25,9 @@ from pydantic_core import ValidationError
from pytest_lazyfixture import lazy_fixture
from sqlalchemy.exc import ArgumentError, IntegrityError
+from pyiceberg.catalog import (
+ Catalog,
+)
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.exceptions import (
CommitFailedException,
@@ -52,51 +55,90 @@ from pyiceberg.typedef import Identifier
from pyiceberg.types import IntegerType
[email protected](name="random_identifier")
-def fixture_random_identifier(warehouse: Path, database_name: str, table_name:
str) -> Identifier:
[email protected](scope="module")
+def catalog_name() -> str:
+ return "test_sql_catalog"
+
+
[email protected](name="random_table_identifier")
+def fixture_random_table_identifier(warehouse: Path, database_name: str,
table_name: str) -> Identifier:
os.makedirs(f"{warehouse}/{database_name}.db/{table_name}/metadata/",
exist_ok=True)
return database_name, table_name
[email protected](name="another_random_identifier")
-def fixture_another_random_identifier(warehouse: Path, database_name: str,
table_name: str) -> Identifier:
[email protected](name="random_table_identifier_with_catalog")
+def fixture_random_table_identifier_with_catalog(
+ warehouse: Path, catalog_name: str, database_name: str, table_name: str
+) -> Identifier:
+ os.makedirs(f"{warehouse}/{database_name}.db/{table_name}/metadata/",
exist_ok=True)
+ return catalog_name, database_name, table_name
+
+
[email protected](name="another_random_table_identifier")
+def fixture_another_random_table_identifier(warehouse: Path, database_name:
str, table_name: str) -> Identifier:
database_name = database_name + "_new"
table_name = table_name + "_new"
os.makedirs(f"{warehouse}/{database_name}.db/{table_name}/metadata/",
exist_ok=True)
return database_name, table_name
[email protected](name="another_random_table_identifier_with_catalog")
+def fixture_another_random_table_identifier_with_catalog(
+ warehouse: Path, catalog_name: str, database_name: str, table_name: str
+) -> Identifier:
+ database_name = database_name + "_new"
+ table_name = table_name + "_new"
+ os.makedirs(f"{warehouse}/{database_name}.db/{table_name}/metadata/",
exist_ok=True)
+ return catalog_name, database_name, table_name
+
+
[email protected](name="random_hierarchical_identifier")
+def fixture_random_hierarchical_identifier(warehouse: Path,
hierarchical_namespace_name: str, table_name: str) -> Identifier:
+
os.makedirs(f"{warehouse}/{hierarchical_namespace_name}.db/{table_name}/metadata/",
exist_ok=True)
+ return Catalog.identifier_to_tuple(".".join((hierarchical_namespace_name,
table_name)))
+
+
[email protected](name="another_random_hierarchical_identifier")
+def fixture_another_random_hierarchical_identifier(
+ warehouse: Path, hierarchical_namespace_name: str, table_name: str
+) -> Identifier:
+ hierarchical_namespace_name = hierarchical_namespace_name + "_new"
+ table_name = table_name + "_new"
+
os.makedirs(f"{warehouse}/{hierarchical_namespace_name}.db/{table_name}/metadata/",
exist_ok=True)
+ return Catalog.identifier_to_tuple(".".join((hierarchical_namespace_name,
table_name)))
+
+
@pytest.fixture(scope="module")
-def catalog_memory(warehouse: Path) -> Generator[SqlCatalog, None, None]:
+def catalog_memory(catalog_name: str, warehouse: Path) ->
Generator[SqlCatalog, None, None]:
props = {
"uri": "sqlite:///:memory:",
"warehouse": f"file://{warehouse}",
}
- catalog = SqlCatalog("test_sql_catalog", **props)
+ catalog = SqlCatalog(catalog_name, **props)
catalog.create_tables()
yield catalog
catalog.destroy_tables()
@pytest.fixture(scope="module")
-def catalog_sqlite(warehouse: Path) -> Generator[SqlCatalog, None, None]:
+def catalog_sqlite(catalog_name: str, warehouse: Path) ->
Generator[SqlCatalog, None, None]:
props = {
"uri": f"sqlite:////{warehouse}/sql-catalog.db",
"warehouse": f"file://{warehouse}",
}
- catalog = SqlCatalog("test_sql_catalog", **props)
+ catalog = SqlCatalog(catalog_name, **props)
catalog.create_tables()
yield catalog
catalog.destroy_tables()
@pytest.fixture(scope="module")
-def catalog_sqlite_without_rowcount(warehouse: Path) -> Generator[SqlCatalog,
None, None]:
+def catalog_sqlite_without_rowcount(catalog_name: str, warehouse: Path) ->
Generator[SqlCatalog, None, None]:
props = {
"uri": f"sqlite:////{warehouse}/sql-catalog.db",
"warehouse": f"file://{warehouse}",
}
- catalog = SqlCatalog("test_sql_catalog", **props)
+ catalog = SqlCatalog(catalog_name, **props)
catalog.engine.dialect.supports_sane_rowcount = False
catalog.create_tables()
yield catalog
@@ -104,26 +146,26 @@ def catalog_sqlite_without_rowcount(warehouse: Path) ->
Generator[SqlCatalog, No
@pytest.fixture(scope="module")
-def catalog_sqlite_fsspec(warehouse: Path) -> Generator[SqlCatalog, None,
None]:
+def catalog_sqlite_fsspec(catalog_name: str, warehouse: Path) ->
Generator[SqlCatalog, None, None]:
props = {
"uri": f"sqlite:////{warehouse}/sql-catalog.db",
"warehouse": f"file://{warehouse}",
PY_IO_IMPL: FSSPEC_FILE_IO,
}
- catalog = SqlCatalog("test_sql_catalog", **props)
+ catalog = SqlCatalog(catalog_name, **props)
catalog.create_tables()
yield catalog
catalog.destroy_tables()
-def test_creation_with_no_uri() -> None:
+def test_creation_with_no_uri(catalog_name: str) -> None:
with pytest.raises(NoSuchPropertyException):
- SqlCatalog("test_ddb_catalog", not_uri="unused")
+ SqlCatalog(catalog_name, not_uri="unused")
-def test_creation_with_unsupported_uri() -> None:
+def test_creation_with_unsupported_uri(catalog_name: str) -> None:
with pytest.raises(ArgumentError):
- SqlCatalog("test_ddb_catalog", uri="unsupported:xxx")
+ SqlCatalog(catalog_name, uri="unsupported:xxx")
@pytest.mark.parametrize(
@@ -146,13 +188,22 @@ def test_create_tables_idempotency(catalog: SqlCatalog)
-> None:
lazy_fixture('catalog_sqlite'),
],
)
-def test_create_table_default_sort_order(catalog: SqlCatalog,
table_schema_nested: Schema, random_identifier: Identifier) -> None:
- database_name, _table_name = random_identifier
- catalog.create_namespace(database_name)
- table = catalog.create_table(random_identifier, table_schema_nested)
[email protected](
+ "table_identifier",
+ [
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ lazy_fixture("random_table_identifier_with_catalog"),
+ ],
+)
+def test_create_table_default_sort_order(catalog: SqlCatalog,
table_schema_nested: Schema, table_identifier: Identifier) -> None:
+ table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(table_identifier)
+ namespace = Catalog.namespace_from(table_identifier_nocatalog)
+ catalog.create_namespace(namespace)
+ table = catalog.create_table(table_identifier, table_schema_nested)
assert table.sort_order().order_id == 0, "Order ID must match"
assert table.sort_order().is_unsorted is True, "Order must be unsorted"
- catalog.drop_table(random_identifier)
+ catalog.drop_table(table_identifier)
@pytest.mark.parametrize(
@@ -162,15 +213,24 @@ def test_create_table_default_sort_order(catalog:
SqlCatalog, table_schema_neste
lazy_fixture('catalog_sqlite'),
],
)
-def test_create_v1_table(catalog: SqlCatalog, table_schema_nested: Schema,
random_identifier: Identifier) -> None:
- database_name, _table_name = random_identifier
- catalog.create_namespace(database_name)
- table = catalog.create_table(random_identifier, table_schema_nested,
properties={"format-version": "1"})
[email protected](
+ "table_identifier",
+ [
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ lazy_fixture("random_table_identifier_with_catalog"),
+ ],
+)
+def test_create_v1_table(catalog: SqlCatalog, table_schema_nested: Schema,
table_identifier: Identifier) -> None:
+ table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(table_identifier)
+ namespace = Catalog.namespace_from(table_identifier_nocatalog)
+ catalog.create_namespace(namespace)
+ table = catalog.create_table(table_identifier, table_schema_nested,
properties={"format-version": "1"})
assert table.sort_order().order_id == 0, "Order ID must match"
assert table.sort_order().is_unsorted is True, "Order must be unsorted"
assert table.format_version == 1
assert table.spec() == UNPARTITIONED_PARTITION_SPEC
- catalog.drop_table(random_identifier)
+ catalog.drop_table(table_identifier)
@pytest.mark.parametrize(
@@ -180,17 +240,26 @@ def test_create_v1_table(catalog: SqlCatalog,
table_schema_nested: Schema, rando
lazy_fixture('catalog_sqlite'),
],
)
[email protected](
+ "table_identifier",
+ [
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ lazy_fixture("random_table_identifier_with_catalog"),
+ ],
+)
def test_create_table_with_pyarrow_schema(
catalog: SqlCatalog,
pyarrow_schema_simple_without_ids: pa.Schema,
iceberg_table_schema_simple: Schema,
- random_identifier: Identifier,
+ table_identifier: Identifier,
) -> None:
- database_name, _table_name = random_identifier
- catalog.create_namespace(database_name)
- table = catalog.create_table(random_identifier,
pyarrow_schema_simple_without_ids)
+ table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(table_identifier)
+ namespace = Catalog.namespace_from(table_identifier_nocatalog)
+ catalog.create_namespace(namespace)
+ table = catalog.create_table(table_identifier,
pyarrow_schema_simple_without_ids)
assert table.schema() == iceberg_table_schema_simple
- catalog.drop_table(random_identifier)
+ catalog.drop_table(table_identifier)
@pytest.mark.parametrize(
@@ -200,7 +269,15 @@ def test_create_table_with_pyarrow_schema(
lazy_fixture('catalog_sqlite'),
],
)
-def test_write_pyarrow_schema(catalog: SqlCatalog, random_identifier:
Identifier) -> None:
[email protected](
+ "table_identifier",
+ [
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ lazy_fixture("random_table_identifier_with_catalog"),
+ ],
+)
+def test_write_pyarrow_schema(catalog: SqlCatalog, table_identifier:
Identifier) -> None:
import pyarrow as pa
pyarrow_table = pa.Table.from_arrays(
@@ -217,9 +294,10 @@ def test_write_pyarrow_schema(catalog: SqlCatalog,
random_identifier: Identifier
pa.field('large', pa.large_string(), nullable=True),
]),
)
- database_name, _table_name = random_identifier
- catalog.create_namespace(database_name)
- table = catalog.create_table(random_identifier, pyarrow_table.schema)
+ table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(table_identifier)
+ namespace = Catalog.namespace_from(table_identifier_nocatalog)
+ catalog.create_namespace(namespace)
+ table = catalog.create_table(table_identifier, pyarrow_table.schema)
table.overwrite(pyarrow_table)
@@ -230,18 +308,27 @@ def test_write_pyarrow_schema(catalog: SqlCatalog,
random_identifier: Identifier
lazy_fixture('catalog_sqlite'),
],
)
-def test_create_table_custom_sort_order(catalog: SqlCatalog,
table_schema_nested: Schema, random_identifier: Identifier) -> None:
- database_name, _table_name = random_identifier
- catalog.create_namespace(database_name)
[email protected](
+ "table_identifier",
+ [
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ lazy_fixture("random_table_identifier_with_catalog"),
+ ],
+)
+def test_create_table_custom_sort_order(catalog: SqlCatalog,
table_schema_nested: Schema, table_identifier: Identifier) -> None:
+ table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(table_identifier)
+ namespace = Catalog.namespace_from(table_identifier_nocatalog)
+ catalog.create_namespace(namespace)
order = SortOrder(SortField(source_id=2, transform=IdentityTransform(),
null_order=NullOrder.NULLS_FIRST))
- table = catalog.create_table(random_identifier, table_schema_nested,
sort_order=order)
+ table = catalog.create_table(table_identifier, table_schema_nested,
sort_order=order)
given_sort_order = table.sort_order()
assert given_sort_order.order_id == 1, "Order ID must match"
assert len(given_sort_order.fields) == 1, "Order must have 1 field"
assert given_sort_order.fields[0].direction == SortDirection.ASC,
"Direction must match"
assert given_sort_order.fields[0].null_order == NullOrder.NULLS_FIRST,
"Null order must match"
assert isinstance(given_sort_order.fields[0].transform,
IdentityTransform), "Transform must match"
- catalog.drop_table(random_identifier)
+ catalog.drop_table(table_identifier)
@pytest.mark.parametrize(
@@ -251,17 +338,26 @@ def test_create_table_custom_sort_order(catalog:
SqlCatalog, table_schema_nested
lazy_fixture('catalog_sqlite'),
],
)
[email protected](
+ "table_identifier",
+ [
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ lazy_fixture("random_table_identifier_with_catalog"),
+ ],
+)
def test_create_table_with_default_warehouse_location(
- warehouse: Path, catalog: SqlCatalog, table_schema_nested: Schema,
random_identifier: Identifier
+ warehouse: Path, catalog: SqlCatalog, table_schema_nested: Schema,
table_identifier: Identifier
) -> None:
- database_name, _table_name = random_identifier
- catalog.create_namespace(database_name)
- catalog.create_table(random_identifier, table_schema_nested)
- table = catalog.load_table(random_identifier)
- assert table.identifier == (catalog.name,) + random_identifier
+ table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(table_identifier)
+ namespace = Catalog.namespace_from(table_identifier_nocatalog)
+ catalog.create_namespace(namespace)
+ catalog.create_table(table_identifier, table_schema_nested)
+ table = catalog.load_table(table_identifier)
+ assert table.identifier == (catalog.name,) + table_identifier_nocatalog
assert table.metadata_location.startswith(f"file://{warehouse}")
assert os.path.exists(table.metadata_location[len("file://") :])
- catalog.drop_table(random_identifier)
+ catalog.drop_table(table_identifier)
@pytest.mark.parametrize(
@@ -271,19 +367,29 @@ def test_create_table_with_default_warehouse_location(
lazy_fixture('catalog_sqlite'),
],
)
[email protected](
+ "table_identifier",
+ [
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ lazy_fixture("random_table_identifier_with_catalog"),
+ ],
+)
def test_create_table_with_given_location_removes_trailing_slash(
- warehouse: Path, catalog: SqlCatalog, table_schema_nested: Schema,
random_identifier: Identifier
+ warehouse: Path, catalog: SqlCatalog, table_schema_nested: Schema,
table_identifier: Identifier
) -> None:
- database_name, table_name = random_identifier
- location = f"file://{warehouse}/{database_name}.db/{table_name}-given"
- catalog.create_namespace(database_name)
- catalog.create_table(random_identifier, table_schema_nested,
location=f"{location}/")
- table = catalog.load_table(random_identifier)
- assert table.identifier == (catalog.name,) + random_identifier
+ table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(table_identifier)
+ namespace = Catalog.namespace_from(table_identifier_nocatalog)
+ table_name = Catalog.table_name_from(table_identifier_nocatalog)
+ location = f"file://{warehouse}/{catalog.name}.db/{table_name}-given"
+ catalog.create_namespace(namespace)
+ catalog.create_table(table_identifier, table_schema_nested,
location=f"{location}/")
+ table = catalog.load_table(table_identifier)
+ assert table.identifier == (catalog.name,) + table_identifier_nocatalog
assert table.metadata_location.startswith(f"file://{warehouse}")
assert os.path.exists(table.metadata_location[len("file://") :])
assert table.location() == location
- catalog.drop_table(random_identifier)
+ catalog.drop_table(table_identifier)
@pytest.mark.parametrize(
@@ -293,12 +399,21 @@ def
test_create_table_with_given_location_removes_trailing_slash(
lazy_fixture('catalog_sqlite'),
],
)
-def test_create_duplicated_table(catalog: SqlCatalog, table_schema_nested:
Schema, random_identifier: Identifier) -> None:
- database_name, _table_name = random_identifier
- catalog.create_namespace(database_name)
- catalog.create_table(random_identifier, table_schema_nested)
[email protected](
+ "table_identifier",
+ [
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ lazy_fixture("random_table_identifier_with_catalog"),
+ ],
+)
+def test_create_duplicated_table(catalog: SqlCatalog, table_schema_nested:
Schema, table_identifier: Identifier) -> None:
+ table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(table_identifier)
+ namespace = Catalog.namespace_from(table_identifier_nocatalog)
+ catalog.create_namespace(namespace)
+ catalog.create_table(table_identifier, table_schema_nested)
with pytest.raises(TableAlreadyExistsError):
- catalog.create_table(random_identifier, table_schema_nested)
+ catalog.create_table(table_identifier, table_schema_nested)
@pytest.mark.parametrize(
@@ -308,13 +423,22 @@ def test_create_duplicated_table(catalog: SqlCatalog,
table_schema_nested: Schem
lazy_fixture('catalog_sqlite'),
],
)
[email protected](
+ "table_identifier",
+ [
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ lazy_fixture("random_table_identifier_with_catalog"),
+ ],
+)
def test_create_table_if_not_exists_duplicated_table(
- catalog: SqlCatalog, table_schema_nested: Schema, random_identifier:
Identifier
+ catalog: SqlCatalog, table_schema_nested: Schema, table_identifier:
Identifier
) -> None:
- database_name, _table_name = random_identifier
- catalog.create_namespace(database_name)
- table1 = catalog.create_table(random_identifier, table_schema_nested)
- table2 = catalog.create_table_if_not_exists(random_identifier,
table_schema_nested)
+ table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(table_identifier)
+ namespace = Catalog.namespace_from(table_identifier_nocatalog)
+ catalog.create_namespace(namespace)
+ table1 = catalog.create_table(table_identifier, table_schema_nested)
+ table2 = catalog.create_table_if_not_exists(table_identifier,
table_schema_nested)
assert table1.identifier == table2.identifier
@@ -339,7 +463,7 @@ def test_create_table_with_non_existing_namespace(catalog:
SqlCatalog, table_sch
],
)
def test_create_table_without_namespace(catalog: SqlCatalog,
table_schema_nested: Schema, table_name: str) -> None:
- with pytest.raises(ValueError):
+ with pytest.raises(NoSuchNamespaceError):
catalog.create_table(table_name, table_schema_nested)
@@ -350,14 +474,23 @@ def test_create_table_without_namespace(catalog:
SqlCatalog, table_schema_nested
lazy_fixture('catalog_sqlite'),
],
)
-def test_register_table(catalog: SqlCatalog, random_identifier: Identifier,
metadata_location: str) -> None:
- database_name, _table_name = random_identifier
- catalog.create_namespace(database_name)
- table = catalog.register_table(random_identifier, metadata_location)
- assert table.identifier == (catalog.name,) + random_identifier
[email protected](
+ "table_identifier",
+ [
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ lazy_fixture("random_table_identifier_with_catalog"),
+ ],
+)
+def test_register_table(catalog: SqlCatalog, table_identifier: Identifier,
metadata_location: str) -> None:
+ table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(table_identifier)
+ namespace = Catalog.namespace_from(table_identifier_nocatalog)
+ catalog.create_namespace(namespace)
+ table = catalog.register_table(table_identifier, metadata_location)
+ assert table.identifier == (catalog.name,) + table_identifier_nocatalog
assert table.metadata_location == metadata_location
assert os.path.exists(metadata_location)
- catalog.drop_table(random_identifier)
+ catalog.drop_table(table_identifier)
@pytest.mark.parametrize(
@@ -367,12 +500,21 @@ def test_register_table(catalog: SqlCatalog,
random_identifier: Identifier, meta
lazy_fixture('catalog_sqlite'),
],
)
-def test_register_existing_table(catalog: SqlCatalog, random_identifier:
Identifier, metadata_location: str) -> None:
- database_name, _table_name = random_identifier
- catalog.create_namespace(database_name)
- catalog.register_table(random_identifier, metadata_location)
[email protected](
+ "table_identifier",
+ [
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ lazy_fixture("random_table_identifier_with_catalog"),
+ ],
+)
+def test_register_existing_table(catalog: SqlCatalog, table_identifier:
Identifier, metadata_location: str) -> None:
+ table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(table_identifier)
+ namespace = Catalog.namespace_from(table_identifier_nocatalog)
+ catalog.create_namespace(namespace)
+ catalog.register_table(table_identifier, metadata_location)
with pytest.raises(TableAlreadyExistsError):
- catalog.register_table(random_identifier, metadata_location)
+ catalog.register_table(table_identifier, metadata_location)
@pytest.mark.parametrize(
@@ -407,11 +549,20 @@ def test_register_table_without_namespace(catalog:
SqlCatalog, metadata_location
lazy_fixture('catalog_sqlite'),
],
)
-def test_load_table(catalog: SqlCatalog, table_schema_nested: Schema,
random_identifier: Identifier) -> None:
- database_name, _table_name = random_identifier
- catalog.create_namespace(database_name)
- table = catalog.create_table(random_identifier, table_schema_nested)
- loaded_table = catalog.load_table(random_identifier)
[email protected](
+ "table_identifier",
+ [
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ lazy_fixture("random_table_identifier_with_catalog"),
+ ],
+)
+def test_load_table(catalog: SqlCatalog, table_schema_nested: Schema,
table_identifier: Identifier) -> None:
+ table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(table_identifier)
+ namespace = Catalog.namespace_from(table_identifier_nocatalog)
+ catalog.create_namespace(namespace)
+ table = catalog.create_table(table_identifier, table_schema_nested)
+ loaded_table = catalog.load_table(table_identifier)
assert table.identifier == loaded_table.identifier
assert table.metadata_location == loaded_table.metadata_location
assert table.metadata == loaded_table.metadata
@@ -424,12 +575,21 @@ def test_load_table(catalog: SqlCatalog,
table_schema_nested: Schema, random_ide
lazy_fixture('catalog_sqlite'),
],
)
-def test_load_table_from_self_identifier(catalog: SqlCatalog,
table_schema_nested: Schema, random_identifier: Identifier) -> None:
- database_name, _table_name = random_identifier
- catalog.create_namespace(database_name)
- table = catalog.create_table(random_identifier, table_schema_nested)
- intermediate = catalog.load_table(random_identifier)
- assert intermediate.identifier == (catalog.name,) + random_identifier
[email protected](
+ "table_identifier",
+ [
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ lazy_fixture("random_table_identifier_with_catalog"),
+ ],
+)
+def test_load_table_from_self_identifier(catalog: SqlCatalog,
table_schema_nested: Schema, table_identifier: Identifier) -> None:
+ table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(table_identifier)
+ namespace = Catalog.namespace_from(table_identifier_nocatalog)
+ catalog.create_namespace(namespace)
+ table = catalog.create_table(table_identifier, table_schema_nested)
+ intermediate = catalog.load_table(table_identifier)
+ assert intermediate.identifier == (catalog.name,) +
table_identifier_nocatalog
loaded_table = catalog.load_table(intermediate.identifier)
assert table.identifier == loaded_table.identifier
assert table.metadata_location == loaded_table.metadata_location
@@ -444,14 +604,23 @@ def test_load_table_from_self_identifier(catalog:
SqlCatalog, table_schema_neste
lazy_fixture('catalog_sqlite_without_rowcount'),
],
)
-def test_drop_table(catalog: SqlCatalog, table_schema_nested: Schema,
random_identifier: Identifier) -> None:
- database_name, _table_name = random_identifier
- catalog.create_namespace(database_name)
- table = catalog.create_table(random_identifier, table_schema_nested)
- assert table.identifier == (catalog.name,) + random_identifier
- catalog.drop_table(random_identifier)
[email protected](
+ "table_identifier",
+ [
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ lazy_fixture("random_table_identifier_with_catalog"),
+ ],
+)
+def test_drop_table(catalog: SqlCatalog, table_schema_nested: Schema,
table_identifier: Identifier) -> None:
+ table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(table_identifier)
+ namespace = Catalog.namespace_from(table_identifier_nocatalog)
+ catalog.create_namespace(namespace)
+ table = catalog.create_table(table_identifier, table_schema_nested)
+ assert table.identifier == (catalog.name,) + table_identifier_nocatalog
+ catalog.drop_table(table_identifier)
with pytest.raises(NoSuchTableError):
- catalog.load_table(random_identifier)
+ catalog.load_table(table_identifier)
@pytest.mark.parametrize(
@@ -462,16 +631,25 @@ def test_drop_table(catalog: SqlCatalog,
table_schema_nested: Schema, random_ide
lazy_fixture('catalog_sqlite_without_rowcount'),
],
)
-def test_drop_table_from_self_identifier(catalog: SqlCatalog,
table_schema_nested: Schema, random_identifier: Identifier) -> None:
- database_name, _table_name = random_identifier
- catalog.create_namespace(database_name)
- table = catalog.create_table(random_identifier, table_schema_nested)
- assert table.identifier == (catalog.name,) + random_identifier
[email protected](
+ "table_identifier",
+ [
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ lazy_fixture("random_table_identifier_with_catalog"),
+ ],
+)
+def test_drop_table_from_self_identifier(catalog: SqlCatalog,
table_schema_nested: Schema, table_identifier: Identifier) -> None:
+ table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(table_identifier)
+ namespace = Catalog.namespace_from(table_identifier_nocatalog)
+ catalog.create_namespace(namespace)
+ table = catalog.create_table(table_identifier, table_schema_nested)
+ assert table.identifier == (catalog.name,) + table_identifier_nocatalog
catalog.drop_table(table.identifier)
with pytest.raises(NoSuchTableError):
catalog.load_table(table.identifier)
with pytest.raises(NoSuchTableError):
- catalog.load_table(random_identifier)
+ catalog.load_table(table_identifier)
@pytest.mark.parametrize(
@@ -482,9 +660,17 @@ def test_drop_table_from_self_identifier(catalog:
SqlCatalog, table_schema_neste
lazy_fixture('catalog_sqlite_without_rowcount'),
],
)
-def test_drop_table_that_does_not_exist(catalog: SqlCatalog,
random_identifier: Identifier) -> None:
[email protected](
+ "table_identifier",
+ [
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ lazy_fixture("random_table_identifier_with_catalog"),
+ ],
+)
+def test_drop_table_that_does_not_exist(catalog: SqlCatalog, table_identifier:
Identifier) -> None:
with pytest.raises(NoSuchTableError):
- catalog.drop_table(random_identifier)
+ catalog.drop_table(table_identifier)
@pytest.mark.parametrize(
@@ -495,21 +681,39 @@ def test_drop_table_that_does_not_exist(catalog:
SqlCatalog, random_identifier:
lazy_fixture('catalog_sqlite_without_rowcount'),
],
)
[email protected](
+ "from_table_identifier",
+ [
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ lazy_fixture("random_table_identifier_with_catalog"),
+ ],
+)
[email protected](
+ "to_table_identifier",
+ [
+ lazy_fixture("another_random_table_identifier"),
+ lazy_fixture("another_random_hierarchical_identifier"),
+ lazy_fixture("another_random_table_identifier_with_catalog"),
+ ],
+)
def test_rename_table(
- catalog: SqlCatalog, table_schema_nested: Schema, random_identifier:
Identifier, another_random_identifier: Identifier
+ catalog: SqlCatalog, table_schema_nested: Schema, from_table_identifier:
Identifier, to_table_identifier: Identifier
) -> None:
- from_database_name, _from_table_name = random_identifier
- to_database_name, _to_table_name = another_random_identifier
- catalog.create_namespace(from_database_name)
- catalog.create_namespace(to_database_name)
- table = catalog.create_table(random_identifier, table_schema_nested)
- assert table.identifier == (catalog.name,) + random_identifier
- catalog.rename_table(random_identifier, another_random_identifier)
- new_table = catalog.load_table(another_random_identifier)
- assert new_table.identifier == (catalog.name,) + another_random_identifier
+ from_table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(from_table_identifier)
+ to_table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(to_table_identifier)
+ from_namespace = Catalog.namespace_from(from_table_identifier_nocatalog)
+ to_namespace = Catalog.namespace_from(to_table_identifier_nocatalog)
+ catalog.create_namespace(from_namespace)
+ catalog.create_namespace(to_namespace)
+ table = catalog.create_table(from_table_identifier, table_schema_nested)
+ assert table.identifier == (catalog.name,) +
from_table_identifier_nocatalog
+ catalog.rename_table(from_table_identifier, to_table_identifier)
+ new_table = catalog.load_table(to_table_identifier)
+ assert new_table.identifier == (catalog.name,) +
to_table_identifier_nocatalog
assert new_table.metadata_location == table.metadata_location
with pytest.raises(NoSuchTableError):
- catalog.load_table(random_identifier)
+ catalog.load_table(from_table_identifier)
@pytest.mark.parametrize(
@@ -520,23 +724,41 @@ def test_rename_table(
lazy_fixture('catalog_sqlite_without_rowcount'),
],
)
[email protected](
+ "from_table_identifier",
+ [
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ lazy_fixture("random_table_identifier_with_catalog"),
+ ],
+)
[email protected](
+ "to_table_identifier",
+ [
+ lazy_fixture("another_random_table_identifier"),
+ lazy_fixture("another_random_hierarchical_identifier"),
+ lazy_fixture("another_random_table_identifier_with_catalog"),
+ ],
+)
def test_rename_table_from_self_identifier(
- catalog: SqlCatalog, table_schema_nested: Schema, random_identifier:
Identifier, another_random_identifier: Identifier
+ catalog: SqlCatalog, table_schema_nested: Schema, from_table_identifier:
Identifier, to_table_identifier: Identifier
) -> None:
- from_database_name, _from_table_name = random_identifier
- to_database_name, _to_table_name = another_random_identifier
- catalog.create_namespace(from_database_name)
- catalog.create_namespace(to_database_name)
- table = catalog.create_table(random_identifier, table_schema_nested)
- assert table.identifier == (catalog.name,) + random_identifier
- catalog.rename_table(table.identifier, another_random_identifier)
- new_table = catalog.load_table(another_random_identifier)
- assert new_table.identifier == (catalog.name,) + another_random_identifier
+ from_table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(from_table_identifier)
+ to_table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(to_table_identifier)
+ from_namespace = Catalog.namespace_from(from_table_identifier_nocatalog)
+ to_namespace = Catalog.namespace_from(to_table_identifier_nocatalog)
+ catalog.create_namespace(from_namespace)
+ catalog.create_namespace(to_namespace)
+ table = catalog.create_table(from_table_identifier, table_schema_nested)
+ assert table.identifier == (catalog.name,) +
from_table_identifier_nocatalog
+ catalog.rename_table(table.identifier, to_table_identifier)
+ new_table = catalog.load_table(to_table_identifier)
+ assert new_table.identifier == (catalog.name,) +
to_table_identifier_nocatalog
assert new_table.metadata_location == table.metadata_location
with pytest.raises(NoSuchTableError):
catalog.load_table(table.identifier)
with pytest.raises(NoSuchTableError):
- catalog.load_table(random_identifier)
+ catalog.load_table(from_table_identifier)
@pytest.mark.parametrize(
@@ -547,19 +769,37 @@ def test_rename_table_from_self_identifier(
lazy_fixture('catalog_sqlite_without_rowcount'),
],
)
[email protected](
+ "from_table_identifier",
+ [
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ lazy_fixture("random_table_identifier_with_catalog"),
+ ],
+)
[email protected](
+ "to_table_identifier",
+ [
+ lazy_fixture("another_random_table_identifier"),
+ lazy_fixture("another_random_hierarchical_identifier"),
+ lazy_fixture("another_random_table_identifier_with_catalog"),
+ ],
+)
def test_rename_table_to_existing_one(
- catalog: SqlCatalog, table_schema_nested: Schema, random_identifier:
Identifier, another_random_identifier: Identifier
+ catalog: SqlCatalog, table_schema_nested: Schema, from_table_identifier:
Identifier, to_table_identifier: Identifier
) -> None:
- from_database_name, _from_table_name = random_identifier
- to_database_name, _to_table_name = another_random_identifier
- catalog.create_namespace(from_database_name)
- catalog.create_namespace(to_database_name)
- table = catalog.create_table(random_identifier, table_schema_nested)
- assert table.identifier == (catalog.name,) + random_identifier
- new_table = catalog.create_table(another_random_identifier,
table_schema_nested)
- assert new_table.identifier == (catalog.name,) + another_random_identifier
+ from_table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(from_table_identifier)
+ to_table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(to_table_identifier)
+ from_namespace = Catalog.namespace_from(from_table_identifier_nocatalog)
+ to_namespace = Catalog.namespace_from(to_table_identifier_nocatalog)
+ catalog.create_namespace(from_namespace)
+ catalog.create_namespace(to_namespace)
+ table = catalog.create_table(from_table_identifier, table_schema_nested)
+ assert table.identifier == (catalog.name,) +
from_table_identifier_nocatalog
+ new_table = catalog.create_table(to_table_identifier, table_schema_nested)
+ assert new_table.identifier == (catalog.name,) +
to_table_identifier_nocatalog
with pytest.raises(TableAlreadyExistsError):
- catalog.rename_table(random_identifier, another_random_identifier)
+ catalog.rename_table(from_table_identifier, to_table_identifier)
@pytest.mark.parametrize(
@@ -570,11 +810,28 @@ def test_rename_table_to_existing_one(
lazy_fixture('catalog_sqlite_without_rowcount'),
],
)
-def test_rename_missing_table(catalog: SqlCatalog, random_identifier:
Identifier, another_random_identifier: Identifier) -> None:
- to_database_name, _to_table_name = another_random_identifier
- catalog.create_namespace(to_database_name)
[email protected](
+ "from_table_identifier",
+ [
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ lazy_fixture("random_table_identifier_with_catalog"),
+ ],
+)
[email protected](
+ "to_table_identifier",
+ [
+ lazy_fixture("another_random_table_identifier"),
+ lazy_fixture("another_random_hierarchical_identifier"),
+ lazy_fixture("another_random_table_identifier_with_catalog"),
+ ],
+)
+def test_rename_missing_table(catalog: SqlCatalog, from_table_identifier:
Identifier, to_table_identifier: Identifier) -> None:
+ to_table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(to_table_identifier)
+ to_namespace = Catalog.namespace_from(to_table_identifier_nocatalog)
+ catalog.create_namespace(to_namespace)
with pytest.raises(NoSuchTableError):
- catalog.rename_table(random_identifier, another_random_identifier)
+ catalog.rename_table(from_table_identifier, to_table_identifier)
@pytest.mark.parametrize(
@@ -585,15 +842,32 @@ def test_rename_missing_table(catalog: SqlCatalog,
random_identifier: Identifier
lazy_fixture('catalog_sqlite_without_rowcount'),
],
)
[email protected](
+ "from_table_identifier",
+ [
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ lazy_fixture("random_table_identifier_with_catalog"),
+ ],
+)
[email protected](
+ "to_table_identifier",
+ [
+ lazy_fixture("another_random_table_identifier"),
+ lazy_fixture("another_random_hierarchical_identifier"),
+ lazy_fixture("another_random_table_identifier_with_catalog"),
+ ],
+)
def test_rename_table_to_missing_namespace(
- catalog: SqlCatalog, table_schema_nested: Schema, random_identifier:
Identifier, another_random_identifier: Identifier
+ catalog: SqlCatalog, table_schema_nested: Schema, from_table_identifier:
Identifier, to_table_identifier: Identifier
) -> None:
- from_database_name, _from_table_name = random_identifier
- catalog.create_namespace(from_database_name)
- table = catalog.create_table(random_identifier, table_schema_nested)
- assert table.identifier == (catalog.name,) + random_identifier
+ from_table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(from_table_identifier)
+ from_namespace = Catalog.namespace_from(from_table_identifier_nocatalog)
+ catalog.create_namespace(from_namespace)
+ table = catalog.create_table(from_table_identifier, table_schema_nested)
+ assert table.identifier == (catalog.name,) +
from_table_identifier_nocatalog
with pytest.raises(NoSuchNamespaceError):
- catalog.rename_table(random_identifier, another_random_identifier)
+ catalog.rename_table(from_table_identifier, to_table_identifier)
@pytest.mark.parametrize(
@@ -603,22 +877,40 @@ def test_rename_table_to_missing_namespace(
lazy_fixture('catalog_sqlite'),
],
)
[email protected](
+ "table_identifier_1",
+ [
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ lazy_fixture("random_table_identifier_with_catalog"),
+ ],
+)
[email protected](
+ "table_identifier_2",
+ [
+ lazy_fixture("another_random_table_identifier"),
+ lazy_fixture("another_random_hierarchical_identifier"),
+ lazy_fixture("another_random_table_identifier_with_catalog"),
+ ],
+)
def test_list_tables(
- catalog: SqlCatalog, table_schema_nested: Schema, random_identifier:
Identifier, another_random_identifier: Identifier
+ catalog: SqlCatalog, table_schema_nested: Schema, table_identifier_1:
Identifier, table_identifier_2: Identifier
) -> None:
- database_name_1, _table_name_1 = random_identifier
- database_name_2, _table_name_2 = another_random_identifier
- catalog.create_namespace(database_name_1)
- catalog.create_namespace(database_name_2)
- catalog.create_table(random_identifier, table_schema_nested)
- catalog.create_table(another_random_identifier, table_schema_nested)
- identifier_list = catalog.list_tables(database_name_1)
+ table_identifier_1_nocatalog =
catalog.identifier_to_tuple_without_catalog(table_identifier_1)
+ table_identifier_2_nocatalog =
catalog.identifier_to_tuple_without_catalog(table_identifier_2)
+ namespace_1 = Catalog.namespace_from(table_identifier_1_nocatalog)
+ namespace_2 = Catalog.namespace_from(table_identifier_2_nocatalog)
+ catalog.create_namespace(namespace_1)
+ catalog.create_namespace(namespace_2)
+ catalog.create_table(table_identifier_1, table_schema_nested)
+ catalog.create_table(table_identifier_2, table_schema_nested)
+ identifier_list = catalog.list_tables(namespace_1)
assert len(identifier_list) == 1
- assert random_identifier in identifier_list
+ assert table_identifier_1_nocatalog in identifier_list
- identifier_list = catalog.list_tables(database_name_2)
+ identifier_list = catalog.list_tables(namespace_2)
assert len(identifier_list) == 1
- assert another_random_identifier in identifier_list
+ assert table_identifier_2_nocatalog in identifier_list
@pytest.mark.parametrize(
@@ -628,9 +920,10 @@ def test_list_tables(
lazy_fixture('catalog_sqlite'),
],
)
-def test_create_namespace(catalog: SqlCatalog, database_name: str) -> None:
- catalog.create_namespace(database_name)
- assert (database_name,) in catalog.list_namespaces()
[email protected]("namespace", [lazy_fixture("database_name"),
lazy_fixture("hierarchical_namespace_name")])
+def test_list_tables_when_missing_namespace(catalog: SqlCatalog, namespace:
str) -> None:
+ with pytest.raises(NoSuchNamespaceError):
+ catalog.list_tables(namespace)
@pytest.mark.parametrize(
@@ -654,10 +947,24 @@ def test_create_namespace_if_not_exists(catalog:
SqlCatalog, database_name: str)
lazy_fixture('catalog_sqlite'),
],
)
-def test_create_duplicate_namespace(catalog: SqlCatalog, database_name: str)
-> None:
- catalog.create_namespace(database_name)
[email protected]("namespace", [lazy_fixture("database_name"),
lazy_fixture("hierarchical_namespace_name")])
+def test_create_namespace(catalog: SqlCatalog, namespace: str) -> None:
+ catalog.create_namespace(namespace)
+ assert (Catalog.identifier_to_tuple(namespace)) in
catalog.list_namespaces()
+
+
[email protected](
+ 'catalog',
+ [
+ lazy_fixture('catalog_memory'),
+ lazy_fixture('catalog_sqlite'),
+ ],
+)
[email protected]("namespace", [lazy_fixture("database_name"),
lazy_fixture("hierarchical_namespace_name")])
+def test_create_duplicate_namespace(catalog: SqlCatalog, namespace: str) ->
None:
+ catalog.create_namespace(namespace)
with pytest.raises(NamespaceAlreadyExistsError):
- catalog.create_namespace(database_name)
+ catalog.create_namespace(namespace)
@pytest.mark.parametrize(
@@ -667,10 +974,11 @@ def test_create_duplicate_namespace(catalog: SqlCatalog,
database_name: str) ->
lazy_fixture('catalog_sqlite'),
],
)
-def test_create_namespaces_sharing_same_prefix(catalog: SqlCatalog,
database_name: str) -> None:
- catalog.create_namespace(database_name + "_1")
[email protected]("namespace", [lazy_fixture("database_name"),
lazy_fixture("hierarchical_namespace_name")])
+def test_create_namespaces_sharing_same_prefix(catalog: SqlCatalog, namespace:
str) -> None:
+ catalog.create_namespace(namespace + "_1")
# Second namespace is a prefix of the first one, make sure it can be added.
- catalog.create_namespace(database_name)
+ catalog.create_namespace(namespace)
@pytest.mark.parametrize(
@@ -680,16 +988,17 @@ def test_create_namespaces_sharing_same_prefix(catalog:
SqlCatalog, database_nam
lazy_fixture('catalog_sqlite'),
],
)
-def test_create_namespace_with_comment_and_location(catalog: SqlCatalog,
database_name: str) -> None:
[email protected]("namespace", [lazy_fixture("database_name"),
lazy_fixture("hierarchical_namespace_name")])
+def test_create_namespace_with_comment_and_location(catalog: SqlCatalog,
namespace: str) -> None:
test_location = "/test/location"
test_properties = {
"comment": "this is a test description",
"location": test_location,
}
- catalog.create_namespace(namespace=database_name,
properties=test_properties)
+ catalog.create_namespace(namespace=namespace, properties=test_properties)
loaded_database_list = catalog.list_namespaces()
- assert (database_name,) in loaded_database_list
- properties = catalog.load_namespace_properties(database_name)
+ assert Catalog.identifier_to_tuple(namespace) in loaded_database_list
+ properties = catalog.load_namespace_properties(namespace)
assert properties["comment"] == "this is a test description"
assert properties["location"] == test_location
@@ -701,13 +1010,27 @@ def
test_create_namespace_with_comment_and_location(catalog: SqlCatalog, databas
lazy_fixture('catalog_sqlite'),
],
)
[email protected]("namespace", [lazy_fixture("database_name"),
lazy_fixture("hierarchical_namespace_name")])
@pytest.mark.filterwarnings("ignore")
-def test_create_namespace_with_null_properties(catalog: SqlCatalog,
database_name: str) -> None:
+def test_create_namespace_with_null_properties(catalog: SqlCatalog, namespace:
str) -> None:
with pytest.raises(IntegrityError):
- catalog.create_namespace(namespace=database_name, properties={None:
"value"}) # type: ignore
+ catalog.create_namespace(namespace=namespace, properties={None:
"value"}) # type: ignore
with pytest.raises(IntegrityError):
- catalog.create_namespace(namespace=database_name, properties={"key":
None})
+ catalog.create_namespace(namespace=namespace, properties={"key": None})
+
+
[email protected](
+ 'catalog',
+ [
+ lazy_fixture('catalog_memory'),
+ lazy_fixture('catalog_sqlite'),
+ ],
+)
[email protected]("empty_namespace", ["", (), (""), ("", ""), " ", ("
")])
+def test_create_namespace_with_empty_identifier(catalog: SqlCatalog,
empty_namespace: Any) -> None:
+ with pytest.raises(NoSuchNamespaceError):
+ catalog.create_namespace(empty_namespace)
@pytest.mark.parametrize(
@@ -717,13 +1040,17 @@ def test_create_namespace_with_null_properties(catalog:
SqlCatalog, database_nam
lazy_fixture('catalog_sqlite'),
],
)
-def test_list_namespaces(catalog: SqlCatalog, database_list: List[str]) ->
None:
- for database_name in database_list:
- catalog.create_namespace(database_name)
- db_list = catalog.list_namespaces()
- for database_name in database_list:
- assert (database_name,) in db_list
- assert len(catalog.list_namespaces(database_name)) == 1
[email protected]("namespace_list", [lazy_fixture("database_list"),
lazy_fixture("hierarchical_namespace_list")])
+def test_list_namespaces(catalog: SqlCatalog, namespace_list: List[str]) ->
None:
+ for namespace in namespace_list:
+ catalog.create_namespace(namespace)
+ # Test global list
+ ns_list = catalog.list_namespaces()
+ for namespace in namespace_list:
+ assert Catalog.identifier_to_tuple(namespace) in ns_list
+ # Test individual namespace list
+ assert len(one_namespace := catalog.list_namespaces(namespace)) == 1
+ assert Catalog.identifier_to_tuple(namespace) == one_namespace[0]
@pytest.mark.parametrize(
@@ -745,16 +1072,25 @@ def test_list_non_existing_namespaces(catalog:
SqlCatalog) -> None:
lazy_fixture('catalog_sqlite'),
],
)
-def test_drop_namespace(catalog: SqlCatalog, table_schema_nested: Schema,
random_identifier: Identifier) -> None:
- database_name, table_name = random_identifier
- catalog.create_namespace(database_name)
- assert (database_name,) in catalog.list_namespaces()
- catalog.create_table((database_name, table_name), table_schema_nested)
[email protected](
+ "table_identifier",
+ [
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ lazy_fixture("random_table_identifier_with_catalog"),
+ ],
+)
+def test_drop_namespace(catalog: SqlCatalog, table_schema_nested: Schema,
table_identifier: Identifier) -> None:
+ table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(table_identifier)
+ namespace = Catalog.namespace_from(table_identifier_nocatalog)
+ catalog.create_namespace(namespace)
+ assert namespace in catalog.list_namespaces()
+ catalog.create_table(table_identifier, table_schema_nested)
with pytest.raises(NamespaceNotEmptyError):
- catalog.drop_namespace(database_name)
- catalog.drop_table((database_name, table_name))
- catalog.drop_namespace(database_name)
- assert (database_name,) not in catalog.list_namespaces()
+ catalog.drop_namespace(namespace)
+ catalog.drop_table(table_identifier)
+ catalog.drop_namespace(namespace)
+ assert namespace not in catalog.list_namespaces()
@pytest.mark.parametrize(
@@ -764,18 +1100,19 @@ def test_drop_namespace(catalog: SqlCatalog,
table_schema_nested: Schema, random
lazy_fixture('catalog_sqlite'),
],
)
-def test_load_namespace_properties(catalog: SqlCatalog, database_name: str) ->
None:
[email protected]("namespace", [lazy_fixture("database_name"),
lazy_fixture("hierarchical_namespace_name")])
+def test_load_namespace_properties(catalog: SqlCatalog, namespace: str) ->
None:
warehouse_location = "/test/location"
test_properties = {
"comment": "this is a test description",
- "location": f"{warehouse_location}/{database_name}.db",
+ "location": f"{warehouse_location}/{namespace}.db",
"test_property1": "1",
"test_property2": "2",
"test_property3": "3",
}
- catalog.create_namespace(database_name, test_properties)
- listed_properties = catalog.load_namespace_properties(database_name)
+ catalog.create_namespace(namespace, test_properties)
+ listed_properties = catalog.load_namespace_properties(namespace)
for k, v in listed_properties.items():
assert k in test_properties
assert v == test_properties[k]
@@ -788,9 +1125,10 @@ def test_load_namespace_properties(catalog: SqlCatalog,
database_name: str) -> N
lazy_fixture('catalog_sqlite'),
],
)
-def test_load_empty_namespace_properties(catalog: SqlCatalog, database_name:
str) -> None:
- catalog.create_namespace(database_name)
- listed_properties = catalog.load_namespace_properties(database_name)
[email protected]("namespace", [lazy_fixture("database_name"),
lazy_fixture("hierarchical_namespace_name")])
+def test_load_empty_namespace_properties(catalog: SqlCatalog, namespace: str)
-> None:
+ catalog.create_namespace(namespace)
+ listed_properties = catalog.load_namespace_properties(namespace)
assert listed_properties == {"exists": "true"}
@@ -813,19 +1151,20 @@ def
test_load_namespace_properties_non_existing_namespace(catalog: SqlCatalog) -
lazy_fixture('catalog_sqlite'),
],
)
-def test_update_namespace_properties(catalog: SqlCatalog, database_name: str)
-> None:
[email protected]("namespace", [lazy_fixture("database_name"),
lazy_fixture("hierarchical_namespace_name")])
+def test_update_namespace_properties(catalog: SqlCatalog, namespace: str) ->
None:
warehouse_location = "/test/location"
test_properties = {
"comment": "this is a test description",
- "location": f"{warehouse_location}/{database_name}.db",
+ "location": f"{warehouse_location}/{namespace}.db",
"test_property1": "1",
"test_property2": "2",
"test_property3": "3",
}
removals = {"test_property1", "test_property2", "test_property3",
"should_not_removed"}
updates = {"test_property4": "4", "test_property5": "5", "comment":
"updated test description"}
- catalog.create_namespace(database_name, test_properties)
- update_report = catalog.update_namespace_properties(database_name,
removals, updates)
+ catalog.create_namespace(namespace, test_properties)
+ update_report = catalog.update_namespace_properties(namespace, removals,
updates)
for k in updates.keys():
assert k in update_report.updated
for k in removals:
@@ -833,7 +1172,7 @@ def test_update_namespace_properties(catalog: SqlCatalog,
database_name: str) ->
assert k in update_report.missing
else:
assert k in update_report.removed
- assert "updated test description" ==
catalog.load_namespace_properties(database_name)["comment"]
+ assert "updated test description" ==
catalog.load_namespace_properties(namespace)["comment"]
@pytest.mark.parametrize(
@@ -844,10 +1183,19 @@ def test_update_namespace_properties(catalog:
SqlCatalog, database_name: str) ->
lazy_fixture('catalog_sqlite_without_rowcount'),
],
)
-def test_commit_table(catalog: SqlCatalog, table_schema_nested: Schema,
random_identifier: Identifier) -> None:
- database_name, _table_name = random_identifier
- catalog.create_namespace(database_name)
- table = catalog.create_table(random_identifier, table_schema_nested)
[email protected](
+ "table_identifier",
+ [
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ lazy_fixture("random_table_identifier_with_catalog"),
+ ],
+)
+def test_commit_table(catalog: SqlCatalog, table_schema_nested: Schema,
table_identifier: Identifier) -> None:
+ table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(table_identifier)
+ namespace = Catalog.namespace_from(table_identifier_nocatalog)
+ catalog.create_namespace(namespace)
+ table = catalog.create_table(table_identifier, table_schema_nested)
assert catalog._parse_metadata_version(table.metadata_location) == 0
assert table.metadata.current_schema_id == 0
@@ -878,10 +1226,19 @@ def test_commit_table(catalog: SqlCatalog,
table_schema_nested: Schema, random_i
lazy_fixture('catalog_sqlite_fsspec'),
],
)
-def test_append_table(catalog: SqlCatalog, table_schema_simple: Schema,
random_identifier: Identifier) -> None:
- database_name, _table_name = random_identifier
- catalog.create_namespace(database_name)
- table = catalog.create_table(random_identifier, table_schema_simple)
[email protected](
+ "table_identifier",
+ [
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ lazy_fixture("random_table_identifier_with_catalog"),
+ ],
+)
+def test_append_table(catalog: SqlCatalog, table_schema_simple: Schema,
table_identifier: Identifier) -> None:
+ table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(table_identifier)
+ namespace = Catalog.namespace_from(table_identifier_nocatalog)
+ catalog.create_namespace(namespace)
+ table = catalog.create_table(table_identifier, table_schema_simple)
df = pa.Table.from_pydict(
{
@@ -918,11 +1275,20 @@ def test_append_table(catalog: SqlCatalog,
table_schema_simple: Schema, random_i
lazy_fixture('catalog_sqlite_without_rowcount'),
],
)
-def test_concurrent_commit_table(catalog: SqlCatalog, table_schema_simple:
Schema, random_identifier: Identifier) -> None:
- database_name, _table_name = random_identifier
- catalog.create_namespace(database_name)
- table_a = catalog.create_table(random_identifier, table_schema_simple)
- table_b = catalog.load_table(random_identifier)
[email protected](
+ "table_identifier",
+ [
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ lazy_fixture("random_table_identifier_with_catalog"),
+ ],
+)
+def test_concurrent_commit_table(catalog: SqlCatalog, table_schema_simple:
Schema, table_identifier: Identifier) -> None:
+ table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(table_identifier)
+ namespace = Catalog.namespace_from(table_identifier_nocatalog)
+ catalog.create_namespace(namespace)
+ table_a = catalog.create_table(table_identifier, table_schema_simple)
+ table_b = catalog.load_table(table_identifier)
with table_a.update_schema() as update:
update.add_column(path="b", field_type=IntegerType())
@@ -992,12 +1358,21 @@ def test_write_and_evolve(catalog: SqlCatalog,
format_version: int) -> None:
lazy_fixture('catalog_sqlite_without_rowcount'),
],
)
-def test_table_properties_int_value(catalog: SqlCatalog, table_schema_simple:
Schema, random_identifier: Identifier) -> None:
[email protected](
+ "table_identifier",
+ [
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ lazy_fixture("random_table_identifier_with_catalog"),
+ ],
+)
+def test_table_properties_int_value(catalog: SqlCatalog, table_schema_simple:
Schema, table_identifier: Identifier) -> None:
# table properties can be set to int, but still serialized to string
- database_name, _table_name = random_identifier
- catalog.create_namespace(database_name)
+ table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(table_identifier)
+ namespace = Catalog.namespace_from(table_identifier_nocatalog)
+ catalog.create_namespace(namespace)
property_with_int = {"property_name": 42}
- table = catalog.create_table(random_identifier, table_schema_simple,
properties=property_with_int)
+ table = catalog.create_table(table_identifier, table_schema_simple,
properties=property_with_int)
assert isinstance(table.properties["property_name"], str)
@@ -1009,14 +1384,23 @@ def test_table_properties_int_value(catalog:
SqlCatalog, table_schema_simple: Sc
lazy_fixture('catalog_sqlite_without_rowcount'),
],
)
[email protected](
+ "table_identifier",
+ [
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ lazy_fixture("random_table_identifier_with_catalog"),
+ ],
+)
def test_table_properties_raise_for_none_value(
- catalog: SqlCatalog, table_schema_simple: Schema, random_identifier:
Identifier
+ catalog: SqlCatalog, table_schema_simple: Schema, table_identifier:
Identifier
) -> None:
- database_name, _table_name = random_identifier
- catalog.create_namespace(database_name)
+ table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(table_identifier)
+ namespace = Catalog.namespace_from(table_identifier_nocatalog)
+ catalog.create_namespace(namespace)
property_with_none = {"property_name": None}
with pytest.raises(ValidationError) as exc_info:
- _ = catalog.create_table(random_identifier, table_schema_simple,
properties=property_with_none)
+ _ = catalog.create_table(table_identifier, table_schema_simple,
properties=property_with_none)
assert "None type is not a supported value in properties: property_name"
in str(exc_info.value)
@@ -1027,11 +1411,20 @@ def test_table_properties_raise_for_none_value(
lazy_fixture('catalog_sqlite'),
],
)
-def test_table_exists(catalog: SqlCatalog, table_schema_simple: Schema,
random_identifier: Identifier) -> None:
- database_name, _table_name = random_identifier
- catalog.create_namespace(database_name)
- catalog.create_table(random_identifier, table_schema_simple,
properties={"format-version": "2"})
- existing_table = random_identifier
[email protected](
+ "table_identifier",
+ [
+ lazy_fixture("random_table_identifier"),
+ lazy_fixture("random_hierarchical_identifier"),
+ lazy_fixture("random_table_identifier_with_catalog"),
+ ],
+)
+def test_table_exists(catalog: SqlCatalog, table_schema_simple: Schema,
table_identifier: Identifier) -> None:
+ table_identifier_nocatalog =
catalog.identifier_to_tuple_without_catalog(table_identifier)
+ namespace = Catalog.namespace_from(table_identifier_nocatalog)
+ catalog.create_namespace(namespace)
+ catalog.create_table(table_identifier, table_schema_simple,
properties={"format-version": "2"})
+ existing_table = table_identifier
# Act and Assert for an existing table
assert catalog.table_exists(existing_table) is True
diff --git a/tests/conftest.py b/tests/conftest.py
index 66795436..4baefafe 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -1878,6 +1878,19 @@ def database_list(database_name: str) -> List[str]:
return [f"{database_name}_{idx}" for idx in range(NUM_TABLES)]
[email protected]()
+def hierarchical_namespace_name() -> str:
+ prefix = "my_iceberg_ns-"
+ random_tag1 = "".join(choice(string.ascii_letters) for _ in
range(RANDOM_LENGTH))
+ random_tag2 = "".join(choice(string.ascii_letters) for _ in
range(RANDOM_LENGTH))
+ return ".".join([prefix + random_tag1, prefix + random_tag2]).lower()
+
+
[email protected]()
+def hierarchical_namespace_list(hierarchical_namespace_name: str) -> List[str]:
+ return [f"{hierarchical_namespace_name}_{idx}" for idx in
range(NUM_TABLES)]
+
+
BUCKET_NAME = "test_bucket"
TABLE_METADATA_LOCATION_REGEX = re.compile(
r"""s3://test_bucket/my_iceberg_database-[a-z]{20}.db/