This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 756ae625 Introduce hierarchical namespaces into SqlCatalog (#591)
756ae625 is described below

commit 756ae625a2ea0f9c12df78430512ce991f6a1976
Author: Eric L (CCCS) <[email protected]>
AuthorDate: Tue May 28 03:52:24 2024 -0400

    Introduce hierarchical namespaces into SqlCatalog (#591)
    
    * Introduce hierarchical namespaces into SqlCatalog
    
    * Fix SqlCatalog unit tests broken from code update.
---
 pyiceberg/catalog/__init__.py |  25 +-
 pyiceberg/catalog/sql.py      | 159 ++++----
 pyiceberg/cli/console.py      |   8 +-
 tests/catalog/test_sql.py     | 847 +++++++++++++++++++++++++++++++-----------
 tests/conftest.py             |  13 +
 5 files changed, 758 insertions(+), 294 deletions(-)

diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py
index 0b70fe32..ea2bc657 100644
--- a/pyiceberg/catalog/__init__.py
+++ b/pyiceberg/catalog/__init__.py
@@ -588,7 +588,7 @@ class Catalog(ABC):
         If the identifier is a string, it is split into a tuple on '.'. If it 
is a tuple, it is used as-is.
 
         Args:
-            identifier (str | Identifier: an identifier, either a string or 
tuple of strings.
+            identifier (str | Identifier): an identifier, either a string or 
tuple of strings.
 
         Returns:
             Identifier: a tuple of strings.
@@ -619,6 +619,29 @@ class Catalog(ABC):
         """
         return Catalog.identifier_to_tuple(identifier)[:-1]
 
+    @staticmethod
+    def namespace_to_string(
+        identifier: Union[str, Identifier], err: Union[Type[ValueError], 
Type[NoSuchNamespaceError]] = ValueError
+    ) -> str:
+        """Transform a namespace identifier into a string.
+
+        Args:
+            identifier (Union[str, Identifier]): a namespace identifier.
+            err (Union[Type[ValueError], Type[NoSuchNamespaceError]]): the 
error type to raise when identifier is empty.
+
+        Returns:
+            Identifier: Namespace identifier.
+        """
+        tuple_identifier = Catalog.identifier_to_tuple(identifier)
+        if len(tuple_identifier) < 1:
+            raise err("Empty namespace identifier")
+
+        # Check if any segment of the tuple is an empty string
+        if any(segment.strip() == "" for segment in tuple_identifier):
+            raise err("Namespace identifier contains an empty segment or a 
segment with only whitespace")
+
+        return ".".join(segment.strip() for segment in tuple_identifier)
+
     @staticmethod
     def identifier_to_database(
         identifier: Union[str, Identifier], err: Union[Type[ValueError], 
Type[NoSuchNamespaceError]] = ValueError
diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py
index 978109b2..6c198767 100644
--- a/pyiceberg/catalog/sql.py
+++ b/pyiceberg/catalog/sql.py
@@ -43,6 +43,7 @@ from sqlalchemy.orm import (
 
 from pyiceberg.catalog import (
     METADATA_LOCATION,
+    Catalog,
     MetastoreCatalog,
     PropertiesUpdateSummary,
 )
@@ -94,6 +95,16 @@ class IcebergNamespaceProperties(SqlCatalogBaseTable):
 
 
 class SqlCatalog(MetastoreCatalog):
+    """Implementation of a SQL based catalog.
+
+    In the `JDBCCatalog` implementation, a `Namespace` is composed of a list 
of strings separated by dots: `'ns1.ns2.ns3'`.
+    And you can have as many levels as you want, but you need at least one.  
The `SqlCatalog` honors the same convention.
+
+    In the `JDBCCatalog` implementation, a `TableIdentifier` is composed of an 
optional `Namespace` and a table name.
+    When a `Namespace` is present, the full name will be 
`'ns1.ns2.ns3.table'`.  A valid `TableIdentifier` could be `'name'` (no 
namespace).
+    The `SqlCatalog` has a different convention where a `TableIdentifier` 
requires a `Namespace`.
+    """
+
     def __init__(self, name: str, **properties: str):
         super().__init__(name, **properties)
 
@@ -136,7 +147,7 @@ class SqlCatalog(MetastoreCatalog):
         file = io.new_input(metadata_location)
         metadata = FromInputFile.table_metadata(file)
         return Table(
-            identifier=(self.name, table_namespace, table_name),
+            identifier=(self.name,) + 
Catalog.identifier_to_tuple(table_namespace) + (table_name,),
             metadata=metadata,
             metadata_location=metadata_location,
             io=self._load_file_io(metadata.properties, metadata_location),
@@ -173,11 +184,14 @@ class SqlCatalog(MetastoreCatalog):
         """
         schema: Schema = self._convert_schema_if_needed(schema)  # type: ignore
 
-        database_name, table_name = 
self.identifier_to_database_and_table(identifier)
-        if not self._namespace_exists(database_name):
-            raise NoSuchNamespaceError(f"Namespace does not exist: 
{database_name}")
+        identifier_nocatalog = 
self.identifier_to_tuple_without_catalog(identifier)
+        namespace_identifier = Catalog.namespace_from(identifier_nocatalog)
+        table_name = Catalog.table_name_from(identifier_nocatalog)
+        if not self._namespace_exists(namespace_identifier):
+            raise NoSuchNamespaceError(f"Namespace does not exist: 
{namespace_identifier}")
 
-        location = self._resolve_table_location(location, database_name, 
table_name)
+        namespace = Catalog.namespace_to_string(namespace_identifier)
+        location = self._resolve_table_location(location, namespace, 
table_name)
         metadata_location = self._get_metadata_location(location=location)
         metadata = new_table_metadata(
             location=location, schema=schema, partition_spec=partition_spec, 
sort_order=sort_order, properties=properties
@@ -190,7 +204,7 @@ class SqlCatalog(MetastoreCatalog):
                 session.add(
                     IcebergTables(
                         catalog_name=self.name,
-                        table_namespace=database_name,
+                        table_namespace=namespace,
                         table_name=table_name,
                         metadata_location=metadata_location,
                         previous_metadata_location=None,
@@ -198,7 +212,7 @@ class SqlCatalog(MetastoreCatalog):
                 )
                 session.commit()
             except IntegrityError as e:
-                raise TableAlreadyExistsError(f"Table 
{database_name}.{table_name} already exists") from e
+                raise TableAlreadyExistsError(f"Table {namespace}.{table_name} 
already exists") from e
 
         return self.load_table(identifier=identifier)
 
@@ -216,16 +230,19 @@ class SqlCatalog(MetastoreCatalog):
             TableAlreadyExistsError: If the table already exists
             NoSuchNamespaceError: If namespace does not exist
         """
-        database_name, table_name = 
self.identifier_to_database_and_table(identifier)
-        if not self._namespace_exists(database_name):
-            raise NoSuchNamespaceError(f"Namespace does not exist: 
{database_name}")
+        identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
+        namespace_tuple = Catalog.namespace_from(identifier_tuple)
+        namespace = Catalog.namespace_to_string(namespace_tuple)
+        table_name = Catalog.table_name_from(identifier_tuple)
+        if not self._namespace_exists(namespace):
+            raise NoSuchNamespaceError(f"Namespace does not exist: 
{namespace}")
 
         with Session(self.engine) as session:
             try:
                 session.add(
                     IcebergTables(
                         catalog_name=self.name,
-                        table_namespace=database_name,
+                        table_namespace=namespace,
                         table_name=table_name,
                         metadata_location=metadata_location,
                         previous_metadata_location=None,
@@ -233,7 +250,7 @@ class SqlCatalog(MetastoreCatalog):
                 )
                 session.commit()
             except IntegrityError as e:
-                raise TableAlreadyExistsError(f"Table 
{database_name}.{table_name} already exists") from e
+                raise TableAlreadyExistsError(f"Table {namespace}.{table_name} 
already exists") from e
 
         return self.load_table(identifier=identifier)
 
@@ -253,17 +270,19 @@ class SqlCatalog(MetastoreCatalog):
             NoSuchTableError: If a table with the name does not exist.
         """
         identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
-        database_name, table_name = 
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
+        namespace_tuple = Catalog.namespace_from(identifier_tuple)
+        namespace = Catalog.namespace_to_string(namespace_tuple)
+        table_name = Catalog.table_name_from(identifier_tuple)
         with Session(self.engine) as session:
             stmt = select(IcebergTables).where(
                 IcebergTables.catalog_name == self.name,
-                IcebergTables.table_namespace == database_name,
+                IcebergTables.table_namespace == namespace,
                 IcebergTables.table_name == table_name,
             )
             result = session.scalar(stmt)
         if result:
             return self._convert_orm_to_iceberg(result)
-        raise NoSuchTableError(f"Table does not exist: 
{database_name}.{table_name}")
+        raise NoSuchTableError(f"Table does not exist: 
{namespace}.{table_name}")
 
     def drop_table(self, identifier: Union[str, Identifier]) -> None:
         """Drop a table.
@@ -275,18 +294,20 @@ class SqlCatalog(MetastoreCatalog):
             NoSuchTableError: If a table with the name does not exist.
         """
         identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
-        database_name, table_name = 
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
+        namespace_tuple = Catalog.namespace_from(identifier_tuple)
+        namespace = Catalog.namespace_to_string(namespace_tuple)
+        table_name = Catalog.table_name_from(identifier_tuple)
         with Session(self.engine) as session:
             if self.engine.dialect.supports_sane_rowcount:
                 res = session.execute(
                     delete(IcebergTables).where(
                         IcebergTables.catalog_name == self.name,
-                        IcebergTables.table_namespace == database_name,
+                        IcebergTables.table_namespace == namespace,
                         IcebergTables.table_name == table_name,
                     )
                 )
                 if res.rowcount < 1:
-                    raise NoSuchTableError(f"Table does not exist: 
{database_name}.{table_name}")
+                    raise NoSuchTableError(f"Table does not exist: 
{namespace}.{table_name}")
             else:
                 try:
                     tbl = (
@@ -294,14 +315,14 @@ class SqlCatalog(MetastoreCatalog):
                         .with_for_update(of=IcebergTables)
                         .filter(
                             IcebergTables.catalog_name == self.name,
-                            IcebergTables.table_namespace == database_name,
+                            IcebergTables.table_namespace == namespace,
                             IcebergTables.table_name == table_name,
                         )
                         .one()
                     )
                     session.delete(tbl)
                 except NoResultFound as e:
-                    raise NoSuchTableError(f"Table does not exist: 
{database_name}.{table_name}") from e
+                    raise NoSuchTableError(f"Table does not exist: 
{namespace}.{table_name}") from e
             session.commit()
 
     def rename_table(self, from_identifier: Union[str, Identifier], 
to_identifier: Union[str, Identifier]) -> Table:
@@ -320,10 +341,15 @@ class SqlCatalog(MetastoreCatalog):
             NoSuchNamespaceError: If the target namespace does not exist.
         """
         from_identifier_tuple = 
self.identifier_to_tuple_without_catalog(from_identifier)
-        from_database_name, from_table_name = 
self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
-        to_database_name, to_table_name = 
self.identifier_to_database_and_table(to_identifier)
-        if not self._namespace_exists(to_database_name):
-            raise NoSuchNamespaceError(f"Namespace does not exist: 
{to_database_name}")
+        to_identifier_tuple = 
self.identifier_to_tuple_without_catalog(to_identifier)
+        from_namespace_tuple = Catalog.namespace_from(from_identifier_tuple)
+        from_namespace = Catalog.namespace_to_string(from_namespace_tuple)
+        from_table_name = Catalog.table_name_from(from_identifier_tuple)
+        to_namespace_tuple = Catalog.namespace_from(to_identifier_tuple)
+        to_namespace = Catalog.namespace_to_string(to_namespace_tuple)
+        to_table_name = Catalog.table_name_from(to_identifier_tuple)
+        if not self._namespace_exists(to_namespace):
+            raise NoSuchNamespaceError(f"Namespace does not exist: 
{to_namespace}")
         with Session(self.engine) as session:
             try:
                 if self.engine.dialect.supports_sane_rowcount:
@@ -331,10 +357,10 @@ class SqlCatalog(MetastoreCatalog):
                         update(IcebergTables)
                         .where(
                             IcebergTables.catalog_name == self.name,
-                            IcebergTables.table_namespace == 
from_database_name,
+                            IcebergTables.table_namespace == from_namespace,
                             IcebergTables.table_name == from_table_name,
                         )
-                        .values(table_namespace=to_database_name, 
table_name=to_table_name)
+                        .values(table_namespace=to_namespace, 
table_name=to_table_name)
                     )
                     result = session.execute(stmt)
                     if result.rowcount < 1:
@@ -346,18 +372,18 @@ class SqlCatalog(MetastoreCatalog):
                             .with_for_update(of=IcebergTables)
                             .filter(
                                 IcebergTables.catalog_name == self.name,
-                                IcebergTables.table_namespace == 
from_database_name,
+                                IcebergTables.table_namespace == 
from_namespace,
                                 IcebergTables.table_name == from_table_name,
                             )
                             .one()
                         )
-                        tbl.table_namespace = to_database_name
+                        tbl.table_namespace = to_namespace
                         tbl.table_name = to_table_name
                     except NoResultFound as e:
                         raise NoSuchTableError(f"Table does not exist: 
{from_table_name}") from e
                 session.commit()
             except IntegrityError as e:
-                raise TableAlreadyExistsError(f"Table 
{to_database_name}.{to_table_name} already exists") from e
+                raise TableAlreadyExistsError(f"Table 
{to_namespace}.{to_table_name} already exists") from e
         return self.load_table(to_identifier)
 
     def _commit_table(self, table_request: CommitTableRequest) -> 
CommitTableResponse:
@@ -377,7 +403,9 @@ class SqlCatalog(MetastoreCatalog):
             tuple(table_request.identifier.namespace.root + 
[table_request.identifier.name])
         )
         current_table = self.load_table(identifier_tuple)
-        database_name, table_name = 
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
+        namespace_tuple = Catalog.namespace_from(identifier_tuple)
+        namespace = Catalog.namespace_to_string(namespace_tuple)
+        table_name = Catalog.table_name_from(identifier_tuple)
         base_metadata = current_table.metadata
         for requirement in table_request.requirements:
             requirement.validate(base_metadata)
@@ -398,7 +426,7 @@ class SqlCatalog(MetastoreCatalog):
                     update(IcebergTables)
                     .where(
                         IcebergTables.catalog_name == self.name,
-                        IcebergTables.table_namespace == database_name,
+                        IcebergTables.table_namespace == namespace,
                         IcebergTables.table_name == table_name,
                         IcebergTables.metadata_location == 
current_table.metadata_location,
                     )
@@ -406,7 +434,7 @@ class SqlCatalog(MetastoreCatalog):
                 )
                 result = session.execute(stmt)
                 if result.rowcount < 1:
-                    raise CommitFailedException(f"Table has been updated by 
another process: {database_name}.{table_name}")
+                    raise CommitFailedException(f"Table has been updated by 
another process: {namespace}.{table_name}")
             else:
                 try:
                     tbl = (
@@ -414,7 +442,7 @@ class SqlCatalog(MetastoreCatalog):
                         .with_for_update(of=IcebergTables)
                         .filter(
                             IcebergTables.catalog_name == self.name,
-                            IcebergTables.table_namespace == database_name,
+                            IcebergTables.table_namespace == namespace,
                             IcebergTables.table_name == table_name,
                             IcebergTables.metadata_location == 
current_table.metadata_location,
                         )
@@ -423,13 +451,14 @@ class SqlCatalog(MetastoreCatalog):
                     tbl.metadata_location = new_metadata_location
                     tbl.previous_metadata_location = 
current_table.metadata_location
                 except NoResultFound as e:
-                    raise CommitFailedException(f"Table has been updated by 
another process: {database_name}.{table_name}") from e
+                    raise CommitFailedException(f"Table has been updated by 
another process: {namespace}.{table_name}") from e
             session.commit()
 
         return CommitTableResponse(metadata=updated_metadata, 
metadata_location=new_metadata_location)
 
     def _namespace_exists(self, identifier: Union[str, Identifier]) -> bool:
-        namespace = self.identifier_to_database(identifier)
+        namespace_tuple = Catalog.identifier_to_tuple(identifier)
+        namespace = Catalog.namespace_to_string(namespace_tuple, 
NoSuchNamespaceError)
         with Session(self.engine) as session:
             stmt = (
                 select(IcebergTables)
@@ -462,18 +491,20 @@ class SqlCatalog(MetastoreCatalog):
         Raises:
             NamespaceAlreadyExistsError: If a namespace with the given name 
already exists.
         """
+        if self._namespace_exists(namespace):
+            raise NamespaceAlreadyExistsError(f"Namespace {namespace} already 
exists")
+
         if not properties:
             properties = 
IcebergNamespaceProperties.NAMESPACE_MINIMAL_PROPERTIES
-        database_name = self.identifier_to_database(namespace)
-        if self._namespace_exists(database_name):
-            raise NamespaceAlreadyExistsError(f"Database {database_name} 
already exists")
-
         create_properties = properties if properties else 
IcebergNamespaceProperties.NAMESPACE_MINIMAL_PROPERTIES
         with Session(self.engine) as session:
             for key, value in create_properties.items():
                 session.add(
                     IcebergNamespaceProperties(
-                        catalog_name=self.name, namespace=database_name, 
property_key=key, property_value=value
+                        catalog_name=self.name,
+                        namespace=Catalog.namespace_to_string(namespace, 
NoSuchNamespaceError),
+                        property_key=key,
+                        property_value=value,
                     )
                 )
             session.commit()
@@ -488,16 +519,16 @@ class SqlCatalog(MetastoreCatalog):
             NoSuchNamespaceError: If a namespace with the given name does not 
exist.
             NamespaceNotEmptyError: If the namespace is not empty.
         """
-        database_name = self.identifier_to_database(namespace, 
NoSuchNamespaceError)
-        if self._namespace_exists(database_name):
-            if tables := self.list_tables(database_name):
-                raise NamespaceNotEmptyError(f"Database {database_name} is not 
empty. {len(tables)} tables exist.")
+        if self._namespace_exists(namespace):
+            namespace_str = Catalog.namespace_to_string(namespace)
+            if tables := self.list_tables(namespace):
+                raise NamespaceNotEmptyError(f"Namespace {namespace_str} is 
not empty. {len(tables)} tables exist.")
 
             with Session(self.engine) as session:
                 session.execute(
                     delete(IcebergNamespaceProperties).where(
                         IcebergNamespaceProperties.catalog_name == self.name,
-                        IcebergNamespaceProperties.namespace == database_name,
+                        IcebergNamespaceProperties.namespace == namespace_str,
                     )
                 )
                 session.commit()
@@ -516,14 +547,14 @@ class SqlCatalog(MetastoreCatalog):
         Raises:
             NoSuchNamespaceError: If a namespace with the given name does not 
exist.
         """
-        database_name = self.identifier_to_database(namespace, 
NoSuchNamespaceError)
+        if namespace and not self._namespace_exists(namespace):
+            raise NoSuchNamespaceError(f"Namespace does not exist: 
{namespace}")
 
-        stmt = select(IcebergTables).where(
-            IcebergTables.catalog_name == self.name, 
IcebergTables.table_namespace == database_name
-        )
+        namespace = Catalog.namespace_to_string(namespace)
+        stmt = select(IcebergTables).where(IcebergTables.catalog_name == 
self.name, IcebergTables.table_namespace == namespace)
         with Session(self.engine) as session:
             result = session.scalars(stmt)
-            return [(table.table_namespace, table.table_name) for table in 
result]
+            return [(Catalog.identifier_to_tuple(table.table_namespace) + 
(table.table_name,)) for table in result]
 
     def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> 
List[Identifier]:
         """List namespaces from the given namespace. If not given, list 
top-level namespaces from the catalog.
@@ -543,15 +574,15 @@ class SqlCatalog(MetastoreCatalog):
         table_stmt = 
select(IcebergTables.table_namespace).where(IcebergTables.catalog_name == 
self.name)
         namespace_stmt = 
select(IcebergNamespaceProperties.namespace).where(IcebergNamespaceProperties.catalog_name
 == self.name)
         if namespace:
-            database_name = self.identifier_to_database(namespace, 
NoSuchNamespaceError)
-            table_stmt = 
table_stmt.where(IcebergTables.table_namespace.like(database_name))
-            namespace_stmt = 
namespace_stmt.where(IcebergNamespaceProperties.namespace.like(database_name))
+            namespace_str = Catalog.namespace_to_string(namespace, 
NoSuchNamespaceError)
+            table_stmt = 
table_stmt.where(IcebergTables.table_namespace.like(namespace_str))
+            namespace_stmt = 
namespace_stmt.where(IcebergNamespaceProperties.namespace.like(namespace_str))
         stmt = union(
             table_stmt,
             namespace_stmt,
         )
         with Session(self.engine) as session:
-            return [self.identifier_to_tuple(namespace_col) for namespace_col 
in session.execute(stmt).scalars()]
+            return [Catalog.identifier_to_tuple(namespace_col) for 
namespace_col in session.execute(stmt).scalars()]
 
     def load_namespace_properties(self, namespace: Union[str, Identifier]) -> 
Properties:
         """Get properties for a namespace.
@@ -565,12 +596,12 @@ class SqlCatalog(MetastoreCatalog):
         Raises:
             NoSuchNamespaceError: If a namespace with the given name does not 
exist.
         """
-        database_name = self.identifier_to_database(namespace)
-        if not self._namespace_exists(database_name):
-            raise NoSuchNamespaceError(f"Database {database_name} does not 
exists")
+        namespace_str = Catalog.namespace_to_string(namespace)
+        if not self._namespace_exists(namespace):
+            raise NoSuchNamespaceError(f"Namespace {namespace_str} does not 
exists")
 
         stmt = select(IcebergNamespaceProperties).where(
-            IcebergNamespaceProperties.catalog_name == self.name, 
IcebergNamespaceProperties.namespace == database_name
+            IcebergNamespaceProperties.catalog_name == self.name, 
IcebergNamespaceProperties.namespace == namespace_str
         )
         with Session(self.engine) as session:
             result = session.scalars(stmt)
@@ -590,9 +621,9 @@ class SqlCatalog(MetastoreCatalog):
             NoSuchNamespaceError: If a namespace with the given name does not 
exist.
             ValueError: If removals and updates have overlapping keys.
         """
-        database_name = self.identifier_to_database(namespace)
-        if not self._namespace_exists(database_name):
-            raise NoSuchNamespaceError(f"Database {database_name} does not 
exists")
+        namespace_str = Catalog.namespace_to_string(namespace)
+        if not self._namespace_exists(namespace):
+            raise NoSuchNamespaceError(f"Namespace {namespace_str} does not 
exists")
 
         current_properties = 
self.load_namespace_properties(namespace=namespace)
         properties_update_summary = self._get_updated_props_and_update_summary(
@@ -603,7 +634,7 @@ class SqlCatalog(MetastoreCatalog):
             if removals:
                 delete_stmt = delete(IcebergNamespaceProperties).where(
                     IcebergNamespaceProperties.catalog_name == self.name,
-                    IcebergNamespaceProperties.namespace == database_name,
+                    IcebergNamespaceProperties.namespace == namespace_str,
                     IcebergNamespaceProperties.property_key.in_(removals),
                 )
                 session.execute(delete_stmt)
@@ -614,14 +645,14 @@ class SqlCatalog(MetastoreCatalog):
                 # This is not a problem since it runs in a single transaction
                 delete_stmt = delete(IcebergNamespaceProperties).where(
                     IcebergNamespaceProperties.catalog_name == self.name,
-                    IcebergNamespaceProperties.namespace == database_name,
+                    IcebergNamespaceProperties.namespace == namespace_str,
                     
IcebergNamespaceProperties.property_key.in_(set(updates.keys())),
                 )
                 session.execute(delete_stmt)
                 insert_stmt = insert(IcebergNamespaceProperties)
                 for property_key, property_value in updates.items():
                     insert_stmt = insert_stmt.values(
-                        catalog_name=self.name, namespace=database_name, 
property_key=property_key, property_value=property_value
+                        catalog_name=self.name, namespace=namespace_str, 
property_key=property_key, property_value=property_value
                     )
                 session.execute(insert_stmt)
             session.commit()
diff --git a/pyiceberg/cli/console.py b/pyiceberg/cli/console.py
index 0fbda109..d1833df0 100644
--- a/pyiceberg/cli/console.py
+++ b/pyiceberg/cli/console.py
@@ -112,9 +112,13 @@ def list(ctx: Context, parent: Optional[str]) -> None:  # 
pylint: disable=redefi
     """List tables or namespaces."""
     catalog, output = _catalog_and_output(ctx)
 
-    identifiers = catalog.list_namespaces(parent or ())
-    if not identifiers and parent:
+    identifiers = []
+    if parent:
+        # Do we have tables under parent namespace?
         identifiers = catalog.list_tables(parent)
+    if not identifiers:
+        # List hierarchical namespaces if parent, root namespaces otherwise.
+        identifiers = catalog.list_namespaces(parent or ())
     output.identifiers(identifiers)
 
 
diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py
index efa7b746..285cfd9a 100644
--- a/tests/catalog/test_sql.py
+++ b/tests/catalog/test_sql.py
@@ -17,7 +17,7 @@
 
 import os
 from pathlib import Path
-from typing import Generator, List
+from typing import Any, Generator, List
 
 import pyarrow as pa
 import pytest
@@ -25,6 +25,9 @@ from pydantic_core import ValidationError
 from pytest_lazyfixture import lazy_fixture
 from sqlalchemy.exc import ArgumentError, IntegrityError
 
+from pyiceberg.catalog import (
+    Catalog,
+)
 from pyiceberg.catalog.sql import SqlCatalog
 from pyiceberg.exceptions import (
     CommitFailedException,
@@ -52,51 +55,90 @@ from pyiceberg.typedef import Identifier
 from pyiceberg.types import IntegerType
 
 
[email protected](name="random_identifier")
-def fixture_random_identifier(warehouse: Path, database_name: str, table_name: 
str) -> Identifier:
[email protected](scope="module")
+def catalog_name() -> str:
+    return "test_sql_catalog"
+
+
[email protected](name="random_table_identifier")
+def fixture_random_table_identifier(warehouse: Path, database_name: str, 
table_name: str) -> Identifier:
     os.makedirs(f"{warehouse}/{database_name}.db/{table_name}/metadata/", 
exist_ok=True)
     return database_name, table_name
 
 
[email protected](name="another_random_identifier")
-def fixture_another_random_identifier(warehouse: Path, database_name: str, 
table_name: str) -> Identifier:
[email protected](name="random_table_identifier_with_catalog")
+def fixture_random_table_identifier_with_catalog(
+    warehouse: Path, catalog_name: str, database_name: str, table_name: str
+) -> Identifier:
+    os.makedirs(f"{warehouse}/{database_name}.db/{table_name}/metadata/", 
exist_ok=True)
+    return catalog_name, database_name, table_name
+
+
[email protected](name="another_random_table_identifier")
+def fixture_another_random_table_identifier(warehouse: Path, database_name: 
str, table_name: str) -> Identifier:
     database_name = database_name + "_new"
     table_name = table_name + "_new"
     os.makedirs(f"{warehouse}/{database_name}.db/{table_name}/metadata/", 
exist_ok=True)
     return database_name, table_name
 
 
[email protected](name="another_random_table_identifier_with_catalog")
+def fixture_another_random_table_identifier_with_catalog(
+    warehouse: Path, catalog_name: str, database_name: str, table_name: str
+) -> Identifier:
+    database_name = database_name + "_new"
+    table_name = table_name + "_new"
+    os.makedirs(f"{warehouse}/{database_name}.db/{table_name}/metadata/", 
exist_ok=True)
+    return catalog_name, database_name, table_name
+
+
[email protected](name="random_hierarchical_identifier")
+def fixture_random_hierarchical_identifier(warehouse: Path, 
hierarchical_namespace_name: str, table_name: str) -> Identifier:
+    
os.makedirs(f"{warehouse}/{hierarchical_namespace_name}.db/{table_name}/metadata/",
 exist_ok=True)
+    return Catalog.identifier_to_tuple(".".join((hierarchical_namespace_name, 
table_name)))
+
+
[email protected](name="another_random_hierarchical_identifier")
+def fixture_another_random_hierarchical_identifier(
+    warehouse: Path, hierarchical_namespace_name: str, table_name: str
+) -> Identifier:
+    hierarchical_namespace_name = hierarchical_namespace_name + "_new"
+    table_name = table_name + "_new"
+    
os.makedirs(f"{warehouse}/{hierarchical_namespace_name}.db/{table_name}/metadata/",
 exist_ok=True)
+    return Catalog.identifier_to_tuple(".".join((hierarchical_namespace_name, 
table_name)))
+
+
 @pytest.fixture(scope="module")
-def catalog_memory(warehouse: Path) -> Generator[SqlCatalog, None, None]:
+def catalog_memory(catalog_name: str, warehouse: Path) -> 
Generator[SqlCatalog, None, None]:
     props = {
         "uri": "sqlite:///:memory:",
         "warehouse": f"file://{warehouse}",
     }
-    catalog = SqlCatalog("test_sql_catalog", **props)
+    catalog = SqlCatalog(catalog_name, **props)
     catalog.create_tables()
     yield catalog
     catalog.destroy_tables()
 
 
 @pytest.fixture(scope="module")
-def catalog_sqlite(warehouse: Path) -> Generator[SqlCatalog, None, None]:
+def catalog_sqlite(catalog_name: str, warehouse: Path) -> 
Generator[SqlCatalog, None, None]:
     props = {
         "uri": f"sqlite:////{warehouse}/sql-catalog.db",
         "warehouse": f"file://{warehouse}",
     }
-    catalog = SqlCatalog("test_sql_catalog", **props)
+    catalog = SqlCatalog(catalog_name, **props)
     catalog.create_tables()
     yield catalog
     catalog.destroy_tables()
 
 
 @pytest.fixture(scope="module")
-def catalog_sqlite_without_rowcount(warehouse: Path) -> Generator[SqlCatalog, 
None, None]:
+def catalog_sqlite_without_rowcount(catalog_name: str, warehouse: Path) -> 
Generator[SqlCatalog, None, None]:
     props = {
         "uri": f"sqlite:////{warehouse}/sql-catalog.db",
         "warehouse": f"file://{warehouse}",
     }
-    catalog = SqlCatalog("test_sql_catalog", **props)
+    catalog = SqlCatalog(catalog_name, **props)
     catalog.engine.dialect.supports_sane_rowcount = False
     catalog.create_tables()
     yield catalog
@@ -104,26 +146,26 @@ def catalog_sqlite_without_rowcount(warehouse: Path) -> 
Generator[SqlCatalog, No
 
 
 @pytest.fixture(scope="module")
-def catalog_sqlite_fsspec(warehouse: Path) -> Generator[SqlCatalog, None, 
None]:
+def catalog_sqlite_fsspec(catalog_name: str, warehouse: Path) -> 
Generator[SqlCatalog, None, None]:
     props = {
         "uri": f"sqlite:////{warehouse}/sql-catalog.db",
         "warehouse": f"file://{warehouse}",
         PY_IO_IMPL: FSSPEC_FILE_IO,
     }
-    catalog = SqlCatalog("test_sql_catalog", **props)
+    catalog = SqlCatalog(catalog_name, **props)
     catalog.create_tables()
     yield catalog
     catalog.destroy_tables()
 
 
-def test_creation_with_no_uri() -> None:
+def test_creation_with_no_uri(catalog_name: str) -> None:
     with pytest.raises(NoSuchPropertyException):
-        SqlCatalog("test_ddb_catalog", not_uri="unused")
+        SqlCatalog(catalog_name, not_uri="unused")
 
 
-def test_creation_with_unsupported_uri() -> None:
+def test_creation_with_unsupported_uri(catalog_name: str) -> None:
     with pytest.raises(ArgumentError):
-        SqlCatalog("test_ddb_catalog", uri="unsupported:xxx")
+        SqlCatalog(catalog_name, uri="unsupported:xxx")
 
 
 @pytest.mark.parametrize(
@@ -146,13 +188,22 @@ def test_create_tables_idempotency(catalog: SqlCatalog) 
-> None:
         lazy_fixture('catalog_sqlite'),
     ],
 )
-def test_create_table_default_sort_order(catalog: SqlCatalog, 
table_schema_nested: Schema, random_identifier: Identifier) -> None:
-    database_name, _table_name = random_identifier
-    catalog.create_namespace(database_name)
-    table = catalog.create_table(random_identifier, table_schema_nested)
[email protected](
+    "table_identifier",
+    [
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+        lazy_fixture("random_table_identifier_with_catalog"),
+    ],
+)
+def test_create_table_default_sort_order(catalog: SqlCatalog, 
table_schema_nested: Schema, table_identifier: Identifier) -> None:
+    table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(table_identifier)
+    namespace = Catalog.namespace_from(table_identifier_nocatalog)
+    catalog.create_namespace(namespace)
+    table = catalog.create_table(table_identifier, table_schema_nested)
     assert table.sort_order().order_id == 0, "Order ID must match"
     assert table.sort_order().is_unsorted is True, "Order must be unsorted"
-    catalog.drop_table(random_identifier)
+    catalog.drop_table(table_identifier)
 
 
 @pytest.mark.parametrize(
@@ -162,15 +213,24 @@ def test_create_table_default_sort_order(catalog: 
SqlCatalog, table_schema_neste
         lazy_fixture('catalog_sqlite'),
     ],
 )
-def test_create_v1_table(catalog: SqlCatalog, table_schema_nested: Schema, 
random_identifier: Identifier) -> None:
-    database_name, _table_name = random_identifier
-    catalog.create_namespace(database_name)
-    table = catalog.create_table(random_identifier, table_schema_nested, 
properties={"format-version": "1"})
[email protected](
+    "table_identifier",
+    [
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+        lazy_fixture("random_table_identifier_with_catalog"),
+    ],
+)
+def test_create_v1_table(catalog: SqlCatalog, table_schema_nested: Schema, 
table_identifier: Identifier) -> None:
+    table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(table_identifier)
+    namespace = Catalog.namespace_from(table_identifier_nocatalog)
+    catalog.create_namespace(namespace)
+    table = catalog.create_table(table_identifier, table_schema_nested, 
properties={"format-version": "1"})
     assert table.sort_order().order_id == 0, "Order ID must match"
     assert table.sort_order().is_unsorted is True, "Order must be unsorted"
     assert table.format_version == 1
     assert table.spec() == UNPARTITIONED_PARTITION_SPEC
-    catalog.drop_table(random_identifier)
+    catalog.drop_table(table_identifier)
 
 
 @pytest.mark.parametrize(
@@ -180,17 +240,26 @@ def test_create_v1_table(catalog: SqlCatalog, 
table_schema_nested: Schema, rando
         lazy_fixture('catalog_sqlite'),
     ],
 )
[email protected](
+    "table_identifier",
+    [
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+        lazy_fixture("random_table_identifier_with_catalog"),
+    ],
+)
 def test_create_table_with_pyarrow_schema(
     catalog: SqlCatalog,
     pyarrow_schema_simple_without_ids: pa.Schema,
     iceberg_table_schema_simple: Schema,
-    random_identifier: Identifier,
+    table_identifier: Identifier,
 ) -> None:
-    database_name, _table_name = random_identifier
-    catalog.create_namespace(database_name)
-    table = catalog.create_table(random_identifier, 
pyarrow_schema_simple_without_ids)
+    table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(table_identifier)
+    namespace = Catalog.namespace_from(table_identifier_nocatalog)
+    catalog.create_namespace(namespace)
+    table = catalog.create_table(table_identifier, 
pyarrow_schema_simple_without_ids)
     assert table.schema() == iceberg_table_schema_simple
-    catalog.drop_table(random_identifier)
+    catalog.drop_table(table_identifier)
 
 
 @pytest.mark.parametrize(
@@ -200,7 +269,15 @@ def test_create_table_with_pyarrow_schema(
         lazy_fixture('catalog_sqlite'),
     ],
 )
-def test_write_pyarrow_schema(catalog: SqlCatalog, random_identifier: 
Identifier) -> None:
[email protected](
+    "table_identifier",
+    [
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+        lazy_fixture("random_table_identifier_with_catalog"),
+    ],
+)
+def test_write_pyarrow_schema(catalog: SqlCatalog, table_identifier: 
Identifier) -> None:
     import pyarrow as pa
 
     pyarrow_table = pa.Table.from_arrays(
@@ -217,9 +294,10 @@ def test_write_pyarrow_schema(catalog: SqlCatalog, 
random_identifier: Identifier
             pa.field('large', pa.large_string(), nullable=True),
         ]),
     )
-    database_name, _table_name = random_identifier
-    catalog.create_namespace(database_name)
-    table = catalog.create_table(random_identifier, pyarrow_table.schema)
+    table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(table_identifier)
+    namespace = Catalog.namespace_from(table_identifier_nocatalog)
+    catalog.create_namespace(namespace)
+    table = catalog.create_table(table_identifier, pyarrow_table.schema)
     table.overwrite(pyarrow_table)
 
 
@@ -230,18 +308,27 @@ def test_write_pyarrow_schema(catalog: SqlCatalog, 
random_identifier: Identifier
         lazy_fixture('catalog_sqlite'),
     ],
 )
-def test_create_table_custom_sort_order(catalog: SqlCatalog, 
table_schema_nested: Schema, random_identifier: Identifier) -> None:
-    database_name, _table_name = random_identifier
-    catalog.create_namespace(database_name)
[email protected](
+    "table_identifier",
+    [
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+        lazy_fixture("random_table_identifier_with_catalog"),
+    ],
+)
+def test_create_table_custom_sort_order(catalog: SqlCatalog, 
table_schema_nested: Schema, table_identifier: Identifier) -> None:
+    table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(table_identifier)
+    namespace = Catalog.namespace_from(table_identifier_nocatalog)
+    catalog.create_namespace(namespace)
     order = SortOrder(SortField(source_id=2, transform=IdentityTransform(), 
null_order=NullOrder.NULLS_FIRST))
-    table = catalog.create_table(random_identifier, table_schema_nested, 
sort_order=order)
+    table = catalog.create_table(table_identifier, table_schema_nested, 
sort_order=order)
     given_sort_order = table.sort_order()
     assert given_sort_order.order_id == 1, "Order ID must match"
     assert len(given_sort_order.fields) == 1, "Order must have 1 field"
     assert given_sort_order.fields[0].direction == SortDirection.ASC, 
"Direction must match"
     assert given_sort_order.fields[0].null_order == NullOrder.NULLS_FIRST, 
"Null order must match"
     assert isinstance(given_sort_order.fields[0].transform, 
IdentityTransform), "Transform must match"
-    catalog.drop_table(random_identifier)
+    catalog.drop_table(table_identifier)
 
 
 @pytest.mark.parametrize(
@@ -251,17 +338,26 @@ def test_create_table_custom_sort_order(catalog: 
SqlCatalog, table_schema_nested
         lazy_fixture('catalog_sqlite'),
     ],
 )
[email protected](
+    "table_identifier",
+    [
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+        lazy_fixture("random_table_identifier_with_catalog"),
+    ],
+)
 def test_create_table_with_default_warehouse_location(
-    warehouse: Path, catalog: SqlCatalog, table_schema_nested: Schema, 
random_identifier: Identifier
+    warehouse: Path, catalog: SqlCatalog, table_schema_nested: Schema, 
table_identifier: Identifier
 ) -> None:
-    database_name, _table_name = random_identifier
-    catalog.create_namespace(database_name)
-    catalog.create_table(random_identifier, table_schema_nested)
-    table = catalog.load_table(random_identifier)
-    assert table.identifier == (catalog.name,) + random_identifier
+    table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(table_identifier)
+    namespace = Catalog.namespace_from(table_identifier_nocatalog)
+    catalog.create_namespace(namespace)
+    catalog.create_table(table_identifier, table_schema_nested)
+    table = catalog.load_table(table_identifier)
+    assert table.identifier == (catalog.name,) + table_identifier_nocatalog
     assert table.metadata_location.startswith(f"file://{warehouse}")
     assert os.path.exists(table.metadata_location[len("file://") :])
-    catalog.drop_table(random_identifier)
+    catalog.drop_table(table_identifier)
 
 
 @pytest.mark.parametrize(
@@ -271,19 +367,29 @@ def test_create_table_with_default_warehouse_location(
         lazy_fixture('catalog_sqlite'),
     ],
 )
[email protected](
+    "table_identifier",
+    [
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+        lazy_fixture("random_table_identifier_with_catalog"),
+    ],
+)
 def test_create_table_with_given_location_removes_trailing_slash(
-    warehouse: Path, catalog: SqlCatalog, table_schema_nested: Schema, 
random_identifier: Identifier
+    warehouse: Path, catalog: SqlCatalog, table_schema_nested: Schema, 
table_identifier: Identifier
 ) -> None:
-    database_name, table_name = random_identifier
-    location = f"file://{warehouse}/{database_name}.db/{table_name}-given"
-    catalog.create_namespace(database_name)
-    catalog.create_table(random_identifier, table_schema_nested, 
location=f"{location}/")
-    table = catalog.load_table(random_identifier)
-    assert table.identifier == (catalog.name,) + random_identifier
+    table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(table_identifier)
+    namespace = Catalog.namespace_from(table_identifier_nocatalog)
+    table_name = Catalog.table_name_from(table_identifier_nocatalog)
+    location = f"file://{warehouse}/{catalog.name}.db/{table_name}-given"
+    catalog.create_namespace(namespace)
+    catalog.create_table(table_identifier, table_schema_nested, 
location=f"{location}/")
+    table = catalog.load_table(table_identifier)
+    assert table.identifier == (catalog.name,) + table_identifier_nocatalog
     assert table.metadata_location.startswith(f"file://{warehouse}")
     assert os.path.exists(table.metadata_location[len("file://") :])
     assert table.location() == location
-    catalog.drop_table(random_identifier)
+    catalog.drop_table(table_identifier)
 
 
 @pytest.mark.parametrize(
@@ -293,12 +399,21 @@ def 
test_create_table_with_given_location_removes_trailing_slash(
         lazy_fixture('catalog_sqlite'),
     ],
 )
-def test_create_duplicated_table(catalog: SqlCatalog, table_schema_nested: 
Schema, random_identifier: Identifier) -> None:
-    database_name, _table_name = random_identifier
-    catalog.create_namespace(database_name)
-    catalog.create_table(random_identifier, table_schema_nested)
[email protected](
+    "table_identifier",
+    [
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+        lazy_fixture("random_table_identifier_with_catalog"),
+    ],
+)
+def test_create_duplicated_table(catalog: SqlCatalog, table_schema_nested: 
Schema, table_identifier: Identifier) -> None:
+    table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(table_identifier)
+    namespace = Catalog.namespace_from(table_identifier_nocatalog)
+    catalog.create_namespace(namespace)
+    catalog.create_table(table_identifier, table_schema_nested)
     with pytest.raises(TableAlreadyExistsError):
-        catalog.create_table(random_identifier, table_schema_nested)
+        catalog.create_table(table_identifier, table_schema_nested)
 
 
 @pytest.mark.parametrize(
@@ -308,13 +423,22 @@ def test_create_duplicated_table(catalog: SqlCatalog, 
table_schema_nested: Schem
         lazy_fixture('catalog_sqlite'),
     ],
 )
[email protected](
+    "table_identifier",
+    [
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+        lazy_fixture("random_table_identifier_with_catalog"),
+    ],
+)
 def test_create_table_if_not_exists_duplicated_table(
-    catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: 
Identifier
+    catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: 
Identifier
 ) -> None:
-    database_name, _table_name = random_identifier
-    catalog.create_namespace(database_name)
-    table1 = catalog.create_table(random_identifier, table_schema_nested)
-    table2 = catalog.create_table_if_not_exists(random_identifier, 
table_schema_nested)
+    table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(table_identifier)
+    namespace = Catalog.namespace_from(table_identifier_nocatalog)
+    catalog.create_namespace(namespace)
+    table1 = catalog.create_table(table_identifier, table_schema_nested)
+    table2 = catalog.create_table_if_not_exists(table_identifier, 
table_schema_nested)
     assert table1.identifier == table2.identifier
 
 
@@ -339,7 +463,7 @@ def test_create_table_with_non_existing_namespace(catalog: 
SqlCatalog, table_sch
     ],
 )
 def test_create_table_without_namespace(catalog: SqlCatalog, 
table_schema_nested: Schema, table_name: str) -> None:
-    with pytest.raises(ValueError):
+    with pytest.raises(NoSuchNamespaceError):
         catalog.create_table(table_name, table_schema_nested)
 
 
@@ -350,14 +474,23 @@ def test_create_table_without_namespace(catalog: 
SqlCatalog, table_schema_nested
         lazy_fixture('catalog_sqlite'),
     ],
 )
-def test_register_table(catalog: SqlCatalog, random_identifier: Identifier, 
metadata_location: str) -> None:
-    database_name, _table_name = random_identifier
-    catalog.create_namespace(database_name)
-    table = catalog.register_table(random_identifier, metadata_location)
-    assert table.identifier == (catalog.name,) + random_identifier
[email protected](
+    "table_identifier",
+    [
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+        lazy_fixture("random_table_identifier_with_catalog"),
+    ],
+)
+def test_register_table(catalog: SqlCatalog, table_identifier: Identifier, 
metadata_location: str) -> None:
+    table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(table_identifier)
+    namespace = Catalog.namespace_from(table_identifier_nocatalog)
+    catalog.create_namespace(namespace)
+    table = catalog.register_table(table_identifier, metadata_location)
+    assert table.identifier == (catalog.name,) + table_identifier_nocatalog
     assert table.metadata_location == metadata_location
     assert os.path.exists(metadata_location)
-    catalog.drop_table(random_identifier)
+    catalog.drop_table(table_identifier)
 
 
 @pytest.mark.parametrize(
@@ -367,12 +500,21 @@ def test_register_table(catalog: SqlCatalog, 
random_identifier: Identifier, meta
         lazy_fixture('catalog_sqlite'),
     ],
 )
-def test_register_existing_table(catalog: SqlCatalog, random_identifier: 
Identifier, metadata_location: str) -> None:
-    database_name, _table_name = random_identifier
-    catalog.create_namespace(database_name)
-    catalog.register_table(random_identifier, metadata_location)
[email protected](
+    "table_identifier",
+    [
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+        lazy_fixture("random_table_identifier_with_catalog"),
+    ],
+)
+def test_register_existing_table(catalog: SqlCatalog, table_identifier: 
Identifier, metadata_location: str) -> None:
+    table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(table_identifier)
+    namespace = Catalog.namespace_from(table_identifier_nocatalog)
+    catalog.create_namespace(namespace)
+    catalog.register_table(table_identifier, metadata_location)
     with pytest.raises(TableAlreadyExistsError):
-        catalog.register_table(random_identifier, metadata_location)
+        catalog.register_table(table_identifier, metadata_location)
 
 
 @pytest.mark.parametrize(
@@ -407,11 +549,20 @@ def test_register_table_without_namespace(catalog: 
SqlCatalog, metadata_location
         lazy_fixture('catalog_sqlite'),
     ],
 )
-def test_load_table(catalog: SqlCatalog, table_schema_nested: Schema, 
random_identifier: Identifier) -> None:
-    database_name, _table_name = random_identifier
-    catalog.create_namespace(database_name)
-    table = catalog.create_table(random_identifier, table_schema_nested)
-    loaded_table = catalog.load_table(random_identifier)
[email protected](
+    "table_identifier",
+    [
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+        lazy_fixture("random_table_identifier_with_catalog"),
+    ],
+)
+def test_load_table(catalog: SqlCatalog, table_schema_nested: Schema, 
table_identifier: Identifier) -> None:
+    table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(table_identifier)
+    namespace = Catalog.namespace_from(table_identifier_nocatalog)
+    catalog.create_namespace(namespace)
+    table = catalog.create_table(table_identifier, table_schema_nested)
+    loaded_table = catalog.load_table(table_identifier)
     assert table.identifier == loaded_table.identifier
     assert table.metadata_location == loaded_table.metadata_location
     assert table.metadata == loaded_table.metadata
@@ -424,12 +575,21 @@ def test_load_table(catalog: SqlCatalog, 
table_schema_nested: Schema, random_ide
         lazy_fixture('catalog_sqlite'),
     ],
 )
-def test_load_table_from_self_identifier(catalog: SqlCatalog, 
table_schema_nested: Schema, random_identifier: Identifier) -> None:
-    database_name, _table_name = random_identifier
-    catalog.create_namespace(database_name)
-    table = catalog.create_table(random_identifier, table_schema_nested)
-    intermediate = catalog.load_table(random_identifier)
-    assert intermediate.identifier == (catalog.name,) + random_identifier
[email protected](
+    "table_identifier",
+    [
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+        lazy_fixture("random_table_identifier_with_catalog"),
+    ],
+)
+def test_load_table_from_self_identifier(catalog: SqlCatalog, 
table_schema_nested: Schema, table_identifier: Identifier) -> None:
+    table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(table_identifier)
+    namespace = Catalog.namespace_from(table_identifier_nocatalog)
+    catalog.create_namespace(namespace)
+    table = catalog.create_table(table_identifier, table_schema_nested)
+    intermediate = catalog.load_table(table_identifier)
+    assert intermediate.identifier == (catalog.name,) + 
table_identifier_nocatalog
     loaded_table = catalog.load_table(intermediate.identifier)
     assert table.identifier == loaded_table.identifier
     assert table.metadata_location == loaded_table.metadata_location
@@ -444,14 +604,23 @@ def test_load_table_from_self_identifier(catalog: 
SqlCatalog, table_schema_neste
         lazy_fixture('catalog_sqlite_without_rowcount'),
     ],
 )
-def test_drop_table(catalog: SqlCatalog, table_schema_nested: Schema, 
random_identifier: Identifier) -> None:
-    database_name, _table_name = random_identifier
-    catalog.create_namespace(database_name)
-    table = catalog.create_table(random_identifier, table_schema_nested)
-    assert table.identifier == (catalog.name,) + random_identifier
-    catalog.drop_table(random_identifier)
[email protected](
+    "table_identifier",
+    [
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+        lazy_fixture("random_table_identifier_with_catalog"),
+    ],
+)
+def test_drop_table(catalog: SqlCatalog, table_schema_nested: Schema, 
table_identifier: Identifier) -> None:
+    table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(table_identifier)
+    namespace = Catalog.namespace_from(table_identifier_nocatalog)
+    catalog.create_namespace(namespace)
+    table = catalog.create_table(table_identifier, table_schema_nested)
+    assert table.identifier == (catalog.name,) + table_identifier_nocatalog
+    catalog.drop_table(table_identifier)
     with pytest.raises(NoSuchTableError):
-        catalog.load_table(random_identifier)
+        catalog.load_table(table_identifier)
 
 
 @pytest.mark.parametrize(
@@ -462,16 +631,25 @@ def test_drop_table(catalog: SqlCatalog, 
table_schema_nested: Schema, random_ide
         lazy_fixture('catalog_sqlite_without_rowcount'),
     ],
 )
-def test_drop_table_from_self_identifier(catalog: SqlCatalog, 
table_schema_nested: Schema, random_identifier: Identifier) -> None:
-    database_name, _table_name = random_identifier
-    catalog.create_namespace(database_name)
-    table = catalog.create_table(random_identifier, table_schema_nested)
-    assert table.identifier == (catalog.name,) + random_identifier
[email protected](
+    "table_identifier",
+    [
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+        lazy_fixture("random_table_identifier_with_catalog"),
+    ],
+)
+def test_drop_table_from_self_identifier(catalog: SqlCatalog, 
table_schema_nested: Schema, table_identifier: Identifier) -> None:
+    table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(table_identifier)
+    namespace = Catalog.namespace_from(table_identifier_nocatalog)
+    catalog.create_namespace(namespace)
+    table = catalog.create_table(table_identifier, table_schema_nested)
+    assert table.identifier == (catalog.name,) + table_identifier_nocatalog
     catalog.drop_table(table.identifier)
     with pytest.raises(NoSuchTableError):
         catalog.load_table(table.identifier)
     with pytest.raises(NoSuchTableError):
-        catalog.load_table(random_identifier)
+        catalog.load_table(table_identifier)
 
 
 @pytest.mark.parametrize(
@@ -482,9 +660,17 @@ def test_drop_table_from_self_identifier(catalog: 
SqlCatalog, table_schema_neste
         lazy_fixture('catalog_sqlite_without_rowcount'),
     ],
 )
-def test_drop_table_that_does_not_exist(catalog: SqlCatalog, 
random_identifier: Identifier) -> None:
[email protected](
+    "table_identifier",
+    [
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+        lazy_fixture("random_table_identifier_with_catalog"),
+    ],
+)
+def test_drop_table_that_does_not_exist(catalog: SqlCatalog, table_identifier: 
Identifier) -> None:
     with pytest.raises(NoSuchTableError):
-        catalog.drop_table(random_identifier)
+        catalog.drop_table(table_identifier)
 
 
 @pytest.mark.parametrize(
@@ -495,21 +681,39 @@ def test_drop_table_that_does_not_exist(catalog: 
SqlCatalog, random_identifier:
         lazy_fixture('catalog_sqlite_without_rowcount'),
     ],
 )
[email protected](
+    "from_table_identifier",
+    [
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+        lazy_fixture("random_table_identifier_with_catalog"),
+    ],
+)
[email protected](
+    "to_table_identifier",
+    [
+        lazy_fixture("another_random_table_identifier"),
+        lazy_fixture("another_random_hierarchical_identifier"),
+        lazy_fixture("another_random_table_identifier_with_catalog"),
+    ],
+)
 def test_rename_table(
-    catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: 
Identifier, another_random_identifier: Identifier
+    catalog: SqlCatalog, table_schema_nested: Schema, from_table_identifier: 
Identifier, to_table_identifier: Identifier
 ) -> None:
-    from_database_name, _from_table_name = random_identifier
-    to_database_name, _to_table_name = another_random_identifier
-    catalog.create_namespace(from_database_name)
-    catalog.create_namespace(to_database_name)
-    table = catalog.create_table(random_identifier, table_schema_nested)
-    assert table.identifier == (catalog.name,) + random_identifier
-    catalog.rename_table(random_identifier, another_random_identifier)
-    new_table = catalog.load_table(another_random_identifier)
-    assert new_table.identifier == (catalog.name,) + another_random_identifier
+    from_table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(from_table_identifier)
+    to_table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(to_table_identifier)
+    from_namespace = Catalog.namespace_from(from_table_identifier_nocatalog)
+    to_namespace = Catalog.namespace_from(to_table_identifier_nocatalog)
+    catalog.create_namespace(from_namespace)
+    catalog.create_namespace(to_namespace)
+    table = catalog.create_table(from_table_identifier, table_schema_nested)
+    assert table.identifier == (catalog.name,) + 
from_table_identifier_nocatalog
+    catalog.rename_table(from_table_identifier, to_table_identifier)
+    new_table = catalog.load_table(to_table_identifier)
+    assert new_table.identifier == (catalog.name,) + 
to_table_identifier_nocatalog
     assert new_table.metadata_location == table.metadata_location
     with pytest.raises(NoSuchTableError):
-        catalog.load_table(random_identifier)
+        catalog.load_table(from_table_identifier)
 
 
 @pytest.mark.parametrize(
@@ -520,23 +724,41 @@ def test_rename_table(
         lazy_fixture('catalog_sqlite_without_rowcount'),
     ],
 )
[email protected](
+    "from_table_identifier",
+    [
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+        lazy_fixture("random_table_identifier_with_catalog"),
+    ],
+)
[email protected](
+    "to_table_identifier",
+    [
+        lazy_fixture("another_random_table_identifier"),
+        lazy_fixture("another_random_hierarchical_identifier"),
+        lazy_fixture("another_random_table_identifier_with_catalog"),
+    ],
+)
 def test_rename_table_from_self_identifier(
-    catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: 
Identifier, another_random_identifier: Identifier
+    catalog: SqlCatalog, table_schema_nested: Schema, from_table_identifier: 
Identifier, to_table_identifier: Identifier
 ) -> None:
-    from_database_name, _from_table_name = random_identifier
-    to_database_name, _to_table_name = another_random_identifier
-    catalog.create_namespace(from_database_name)
-    catalog.create_namespace(to_database_name)
-    table = catalog.create_table(random_identifier, table_schema_nested)
-    assert table.identifier == (catalog.name,) + random_identifier
-    catalog.rename_table(table.identifier, another_random_identifier)
-    new_table = catalog.load_table(another_random_identifier)
-    assert new_table.identifier == (catalog.name,) + another_random_identifier
+    from_table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(from_table_identifier)
+    to_table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(to_table_identifier)
+    from_namespace = Catalog.namespace_from(from_table_identifier_nocatalog)
+    to_namespace = Catalog.namespace_from(to_table_identifier_nocatalog)
+    catalog.create_namespace(from_namespace)
+    catalog.create_namespace(to_namespace)
+    table = catalog.create_table(from_table_identifier, table_schema_nested)
+    assert table.identifier == (catalog.name,) + 
from_table_identifier_nocatalog
+    catalog.rename_table(table.identifier, to_table_identifier)
+    new_table = catalog.load_table(to_table_identifier)
+    assert new_table.identifier == (catalog.name,) + 
to_table_identifier_nocatalog
     assert new_table.metadata_location == table.metadata_location
     with pytest.raises(NoSuchTableError):
         catalog.load_table(table.identifier)
     with pytest.raises(NoSuchTableError):
-        catalog.load_table(random_identifier)
+        catalog.load_table(from_table_identifier)
 
 
 @pytest.mark.parametrize(
@@ -547,19 +769,37 @@ def test_rename_table_from_self_identifier(
         lazy_fixture('catalog_sqlite_without_rowcount'),
     ],
 )
[email protected](
+    "from_table_identifier",
+    [
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+        lazy_fixture("random_table_identifier_with_catalog"),
+    ],
+)
[email protected](
+    "to_table_identifier",
+    [
+        lazy_fixture("another_random_table_identifier"),
+        lazy_fixture("another_random_hierarchical_identifier"),
+        lazy_fixture("another_random_table_identifier_with_catalog"),
+    ],
+)
 def test_rename_table_to_existing_one(
-    catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: 
Identifier, another_random_identifier: Identifier
+    catalog: SqlCatalog, table_schema_nested: Schema, from_table_identifier: 
Identifier, to_table_identifier: Identifier
 ) -> None:
-    from_database_name, _from_table_name = random_identifier
-    to_database_name, _to_table_name = another_random_identifier
-    catalog.create_namespace(from_database_name)
-    catalog.create_namespace(to_database_name)
-    table = catalog.create_table(random_identifier, table_schema_nested)
-    assert table.identifier == (catalog.name,) + random_identifier
-    new_table = catalog.create_table(another_random_identifier, 
table_schema_nested)
-    assert new_table.identifier == (catalog.name,) + another_random_identifier
+    from_table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(from_table_identifier)
+    to_table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(to_table_identifier)
+    from_namespace = Catalog.namespace_from(from_table_identifier_nocatalog)
+    to_namespace = Catalog.namespace_from(to_table_identifier_nocatalog)
+    catalog.create_namespace(from_namespace)
+    catalog.create_namespace(to_namespace)
+    table = catalog.create_table(from_table_identifier, table_schema_nested)
+    assert table.identifier == (catalog.name,) + 
from_table_identifier_nocatalog
+    new_table = catalog.create_table(to_table_identifier, table_schema_nested)
+    assert new_table.identifier == (catalog.name,) + 
to_table_identifier_nocatalog
     with pytest.raises(TableAlreadyExistsError):
-        catalog.rename_table(random_identifier, another_random_identifier)
+        catalog.rename_table(from_table_identifier, to_table_identifier)
 
 
 @pytest.mark.parametrize(
@@ -570,11 +810,28 @@ def test_rename_table_to_existing_one(
         lazy_fixture('catalog_sqlite_without_rowcount'),
     ],
 )
-def test_rename_missing_table(catalog: SqlCatalog, random_identifier: 
Identifier, another_random_identifier: Identifier) -> None:
-    to_database_name, _to_table_name = another_random_identifier
-    catalog.create_namespace(to_database_name)
[email protected](
+    "from_table_identifier",
+    [
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+        lazy_fixture("random_table_identifier_with_catalog"),
+    ],
+)
[email protected](
+    "to_table_identifier",
+    [
+        lazy_fixture("another_random_table_identifier"),
+        lazy_fixture("another_random_hierarchical_identifier"),
+        lazy_fixture("another_random_table_identifier_with_catalog"),
+    ],
+)
+def test_rename_missing_table(catalog: SqlCatalog, from_table_identifier: 
Identifier, to_table_identifier: Identifier) -> None:
+    to_table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(to_table_identifier)
+    to_namespace = Catalog.namespace_from(to_table_identifier_nocatalog)
+    catalog.create_namespace(to_namespace)
     with pytest.raises(NoSuchTableError):
-        catalog.rename_table(random_identifier, another_random_identifier)
+        catalog.rename_table(from_table_identifier, to_table_identifier)
 
 
 @pytest.mark.parametrize(
@@ -585,15 +842,32 @@ def test_rename_missing_table(catalog: SqlCatalog, 
random_identifier: Identifier
         lazy_fixture('catalog_sqlite_without_rowcount'),
     ],
 )
[email protected](
+    "from_table_identifier",
+    [
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+        lazy_fixture("random_table_identifier_with_catalog"),
+    ],
+)
[email protected](
+    "to_table_identifier",
+    [
+        lazy_fixture("another_random_table_identifier"),
+        lazy_fixture("another_random_hierarchical_identifier"),
+        lazy_fixture("another_random_table_identifier_with_catalog"),
+    ],
+)
 def test_rename_table_to_missing_namespace(
-    catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: 
Identifier, another_random_identifier: Identifier
+    catalog: SqlCatalog, table_schema_nested: Schema, from_table_identifier: 
Identifier, to_table_identifier: Identifier
 ) -> None:
-    from_database_name, _from_table_name = random_identifier
-    catalog.create_namespace(from_database_name)
-    table = catalog.create_table(random_identifier, table_schema_nested)
-    assert table.identifier == (catalog.name,) + random_identifier
+    from_table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(from_table_identifier)
+    from_namespace = Catalog.namespace_from(from_table_identifier_nocatalog)
+    catalog.create_namespace(from_namespace)
+    table = catalog.create_table(from_table_identifier, table_schema_nested)
+    assert table.identifier == (catalog.name,) + 
from_table_identifier_nocatalog
     with pytest.raises(NoSuchNamespaceError):
-        catalog.rename_table(random_identifier, another_random_identifier)
+        catalog.rename_table(from_table_identifier, to_table_identifier)
 
 
 @pytest.mark.parametrize(
@@ -603,22 +877,40 @@ def test_rename_table_to_missing_namespace(
         lazy_fixture('catalog_sqlite'),
     ],
 )
[email protected](
+    "table_identifier_1",
+    [
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+        lazy_fixture("random_table_identifier_with_catalog"),
+    ],
+)
[email protected](
+    "table_identifier_2",
+    [
+        lazy_fixture("another_random_table_identifier"),
+        lazy_fixture("another_random_hierarchical_identifier"),
+        lazy_fixture("another_random_table_identifier_with_catalog"),
+    ],
+)
 def test_list_tables(
-    catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: 
Identifier, another_random_identifier: Identifier
+    catalog: SqlCatalog, table_schema_nested: Schema, table_identifier_1: 
Identifier, table_identifier_2: Identifier
 ) -> None:
-    database_name_1, _table_name_1 = random_identifier
-    database_name_2, _table_name_2 = another_random_identifier
-    catalog.create_namespace(database_name_1)
-    catalog.create_namespace(database_name_2)
-    catalog.create_table(random_identifier, table_schema_nested)
-    catalog.create_table(another_random_identifier, table_schema_nested)
-    identifier_list = catalog.list_tables(database_name_1)
+    table_identifier_1_nocatalog = 
catalog.identifier_to_tuple_without_catalog(table_identifier_1)
+    table_identifier_2_nocatalog = 
catalog.identifier_to_tuple_without_catalog(table_identifier_2)
+    namespace_1 = Catalog.namespace_from(table_identifier_1_nocatalog)
+    namespace_2 = Catalog.namespace_from(table_identifier_2_nocatalog)
+    catalog.create_namespace(namespace_1)
+    catalog.create_namespace(namespace_2)
+    catalog.create_table(table_identifier_1, table_schema_nested)
+    catalog.create_table(table_identifier_2, table_schema_nested)
+    identifier_list = catalog.list_tables(namespace_1)
     assert len(identifier_list) == 1
-    assert random_identifier in identifier_list
+    assert table_identifier_1_nocatalog in identifier_list
 
-    identifier_list = catalog.list_tables(database_name_2)
+    identifier_list = catalog.list_tables(namespace_2)
     assert len(identifier_list) == 1
-    assert another_random_identifier in identifier_list
+    assert table_identifier_2_nocatalog in identifier_list
 
 
 @pytest.mark.parametrize(
@@ -628,9 +920,10 @@ def test_list_tables(
         lazy_fixture('catalog_sqlite'),
     ],
 )
-def test_create_namespace(catalog: SqlCatalog, database_name: str) -> None:
-    catalog.create_namespace(database_name)
-    assert (database_name,) in catalog.list_namespaces()
[email protected]("namespace", [lazy_fixture("database_name"), 
lazy_fixture("hierarchical_namespace_name")])
+def test_list_tables_when_missing_namespace(catalog: SqlCatalog, namespace: 
str) -> None:
+    with pytest.raises(NoSuchNamespaceError):
+        catalog.list_tables(namespace)
 
 
 @pytest.mark.parametrize(
@@ -654,10 +947,24 @@ def test_create_namespace_if_not_exists(catalog: 
SqlCatalog, database_name: str)
         lazy_fixture('catalog_sqlite'),
     ],
 )
-def test_create_duplicate_namespace(catalog: SqlCatalog, database_name: str) 
-> None:
-    catalog.create_namespace(database_name)
[email protected]("namespace", [lazy_fixture("database_name"), 
lazy_fixture("hierarchical_namespace_name")])
+def test_create_namespace(catalog: SqlCatalog, namespace: str) -> None:
+    catalog.create_namespace(namespace)
+    assert (Catalog.identifier_to_tuple(namespace)) in 
catalog.list_namespaces()
+
+
[email protected](
+    'catalog',
+    [
+        lazy_fixture('catalog_memory'),
+        lazy_fixture('catalog_sqlite'),
+    ],
+)
[email protected]("namespace", [lazy_fixture("database_name"), 
lazy_fixture("hierarchical_namespace_name")])
+def test_create_duplicate_namespace(catalog: SqlCatalog, namespace: str) -> 
None:
+    catalog.create_namespace(namespace)
     with pytest.raises(NamespaceAlreadyExistsError):
-        catalog.create_namespace(database_name)
+        catalog.create_namespace(namespace)
 
 
 @pytest.mark.parametrize(
@@ -667,10 +974,11 @@ def test_create_duplicate_namespace(catalog: SqlCatalog, 
database_name: str) ->
         lazy_fixture('catalog_sqlite'),
     ],
 )
-def test_create_namespaces_sharing_same_prefix(catalog: SqlCatalog, 
database_name: str) -> None:
-    catalog.create_namespace(database_name + "_1")
[email protected]("namespace", [lazy_fixture("database_name"), 
lazy_fixture("hierarchical_namespace_name")])
+def test_create_namespaces_sharing_same_prefix(catalog: SqlCatalog, namespace: 
str) -> None:
+    catalog.create_namespace(namespace + "_1")
     # Second namespace is a prefix of the first one, make sure it can be added.
-    catalog.create_namespace(database_name)
+    catalog.create_namespace(namespace)
 
 
 @pytest.mark.parametrize(
@@ -680,16 +988,17 @@ def test_create_namespaces_sharing_same_prefix(catalog: 
SqlCatalog, database_nam
         lazy_fixture('catalog_sqlite'),
     ],
 )
-def test_create_namespace_with_comment_and_location(catalog: SqlCatalog, 
database_name: str) -> None:
[email protected]("namespace", [lazy_fixture("database_name"), 
lazy_fixture("hierarchical_namespace_name")])
+def test_create_namespace_with_comment_and_location(catalog: SqlCatalog, 
namespace: str) -> None:
     test_location = "/test/location"
     test_properties = {
         "comment": "this is a test description",
         "location": test_location,
     }
-    catalog.create_namespace(namespace=database_name, 
properties=test_properties)
+    catalog.create_namespace(namespace=namespace, properties=test_properties)
     loaded_database_list = catalog.list_namespaces()
-    assert (database_name,) in loaded_database_list
-    properties = catalog.load_namespace_properties(database_name)
+    assert Catalog.identifier_to_tuple(namespace) in loaded_database_list
+    properties = catalog.load_namespace_properties(namespace)
     assert properties["comment"] == "this is a test description"
     assert properties["location"] == test_location
 
@@ -701,13 +1010,27 @@ def 
test_create_namespace_with_comment_and_location(catalog: SqlCatalog, databas
         lazy_fixture('catalog_sqlite'),
     ],
 )
[email protected]("namespace", [lazy_fixture("database_name"), 
lazy_fixture("hierarchical_namespace_name")])
 @pytest.mark.filterwarnings("ignore")
-def test_create_namespace_with_null_properties(catalog: SqlCatalog, 
database_name: str) -> None:
+def test_create_namespace_with_null_properties(catalog: SqlCatalog, namespace: 
str) -> None:
     with pytest.raises(IntegrityError):
-        catalog.create_namespace(namespace=database_name, properties={None: 
"value"})  # type: ignore
+        catalog.create_namespace(namespace=namespace, properties={None: 
"value"})  # type: ignore
 
     with pytest.raises(IntegrityError):
-        catalog.create_namespace(namespace=database_name, properties={"key": 
None})
+        catalog.create_namespace(namespace=namespace, properties={"key": None})
+
+
[email protected](
+    'catalog',
+    [
+        lazy_fixture('catalog_memory'),
+        lazy_fixture('catalog_sqlite'),
+    ],
+)
[email protected]("empty_namespace", ["", (), (""), ("", ""), " ", (" 
")])
+def test_create_namespace_with_empty_identifier(catalog: SqlCatalog, 
empty_namespace: Any) -> None:
+    with pytest.raises(NoSuchNamespaceError):
+        catalog.create_namespace(empty_namespace)
 
 
 @pytest.mark.parametrize(
@@ -717,13 +1040,17 @@ def test_create_namespace_with_null_properties(catalog: 
SqlCatalog, database_nam
         lazy_fixture('catalog_sqlite'),
     ],
 )
-def test_list_namespaces(catalog: SqlCatalog, database_list: List[str]) -> 
None:
-    for database_name in database_list:
-        catalog.create_namespace(database_name)
-    db_list = catalog.list_namespaces()
-    for database_name in database_list:
-        assert (database_name,) in db_list
-        assert len(catalog.list_namespaces(database_name)) == 1
[email protected]("namespace_list", [lazy_fixture("database_list"), 
lazy_fixture("hierarchical_namespace_list")])
+def test_list_namespaces(catalog: SqlCatalog, namespace_list: List[str]) -> 
None:
+    for namespace in namespace_list:
+        catalog.create_namespace(namespace)
+    # Test global list
+    ns_list = catalog.list_namespaces()
+    for namespace in namespace_list:
+        assert Catalog.identifier_to_tuple(namespace) in ns_list
+        # Test individual namespace list
+        assert len(one_namespace := catalog.list_namespaces(namespace)) == 1
+        assert Catalog.identifier_to_tuple(namespace) == one_namespace[0]
 
 
 @pytest.mark.parametrize(
@@ -745,16 +1072,25 @@ def test_list_non_existing_namespaces(catalog: 
SqlCatalog) -> None:
         lazy_fixture('catalog_sqlite'),
     ],
 )
-def test_drop_namespace(catalog: SqlCatalog, table_schema_nested: Schema, 
random_identifier: Identifier) -> None:
-    database_name, table_name = random_identifier
-    catalog.create_namespace(database_name)
-    assert (database_name,) in catalog.list_namespaces()
-    catalog.create_table((database_name, table_name), table_schema_nested)
[email protected](
+    "table_identifier",
+    [
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+        lazy_fixture("random_table_identifier_with_catalog"),
+    ],
+)
+def test_drop_namespace(catalog: SqlCatalog, table_schema_nested: Schema, 
table_identifier: Identifier) -> None:
+    table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(table_identifier)
+    namespace = Catalog.namespace_from(table_identifier_nocatalog)
+    catalog.create_namespace(namespace)
+    assert namespace in catalog.list_namespaces()
+    catalog.create_table(table_identifier, table_schema_nested)
     with pytest.raises(NamespaceNotEmptyError):
-        catalog.drop_namespace(database_name)
-    catalog.drop_table((database_name, table_name))
-    catalog.drop_namespace(database_name)
-    assert (database_name,) not in catalog.list_namespaces()
+        catalog.drop_namespace(namespace)
+    catalog.drop_table(table_identifier)
+    catalog.drop_namespace(namespace)
+    assert namespace not in catalog.list_namespaces()
 
 
 @pytest.mark.parametrize(
@@ -764,18 +1100,19 @@ def test_drop_namespace(catalog: SqlCatalog, 
table_schema_nested: Schema, random
         lazy_fixture('catalog_sqlite'),
     ],
 )
-def test_load_namespace_properties(catalog: SqlCatalog, database_name: str) -> 
None:
[email protected]("namespace", [lazy_fixture("database_name"), 
lazy_fixture("hierarchical_namespace_name")])
+def test_load_namespace_properties(catalog: SqlCatalog, namespace: str) -> 
None:
     warehouse_location = "/test/location"
     test_properties = {
         "comment": "this is a test description",
-        "location": f"{warehouse_location}/{database_name}.db",
+        "location": f"{warehouse_location}/{namespace}.db",
         "test_property1": "1",
         "test_property2": "2",
         "test_property3": "3",
     }
 
-    catalog.create_namespace(database_name, test_properties)
-    listed_properties = catalog.load_namespace_properties(database_name)
+    catalog.create_namespace(namespace, test_properties)
+    listed_properties = catalog.load_namespace_properties(namespace)
     for k, v in listed_properties.items():
         assert k in test_properties
         assert v == test_properties[k]
@@ -788,9 +1125,10 @@ def test_load_namespace_properties(catalog: SqlCatalog, 
database_name: str) -> N
         lazy_fixture('catalog_sqlite'),
     ],
 )
-def test_load_empty_namespace_properties(catalog: SqlCatalog, database_name: 
str) -> None:
-    catalog.create_namespace(database_name)
-    listed_properties = catalog.load_namespace_properties(database_name)
[email protected]("namespace", [lazy_fixture("database_name"), 
lazy_fixture("hierarchical_namespace_name")])
+def test_load_empty_namespace_properties(catalog: SqlCatalog, namespace: str) 
-> None:
+    catalog.create_namespace(namespace)
+    listed_properties = catalog.load_namespace_properties(namespace)
     assert listed_properties == {"exists": "true"}
 
 
@@ -813,19 +1151,20 @@ def 
test_load_namespace_properties_non_existing_namespace(catalog: SqlCatalog) -
         lazy_fixture('catalog_sqlite'),
     ],
 )
-def test_update_namespace_properties(catalog: SqlCatalog, database_name: str) 
-> None:
[email protected]("namespace", [lazy_fixture("database_name"), 
lazy_fixture("hierarchical_namespace_name")])
+def test_update_namespace_properties(catalog: SqlCatalog, namespace: str) -> 
None:
     warehouse_location = "/test/location"
     test_properties = {
         "comment": "this is a test description",
-        "location": f"{warehouse_location}/{database_name}.db",
+        "location": f"{warehouse_location}/{namespace}.db",
         "test_property1": "1",
         "test_property2": "2",
         "test_property3": "3",
     }
     removals = {"test_property1", "test_property2", "test_property3", 
"should_not_removed"}
     updates = {"test_property4": "4", "test_property5": "5", "comment": 
"updated test description"}
-    catalog.create_namespace(database_name, test_properties)
-    update_report = catalog.update_namespace_properties(database_name, 
removals, updates)
+    catalog.create_namespace(namespace, test_properties)
+    update_report = catalog.update_namespace_properties(namespace, removals, 
updates)
     for k in updates.keys():
         assert k in update_report.updated
     for k in removals:
@@ -833,7 +1172,7 @@ def test_update_namespace_properties(catalog: SqlCatalog, 
database_name: str) ->
             assert k in update_report.missing
         else:
             assert k in update_report.removed
-    assert "updated test description" == 
catalog.load_namespace_properties(database_name)["comment"]
+    assert "updated test description" == 
catalog.load_namespace_properties(namespace)["comment"]
 
 
 @pytest.mark.parametrize(
@@ -844,10 +1183,19 @@ def test_update_namespace_properties(catalog: 
SqlCatalog, database_name: str) ->
         lazy_fixture('catalog_sqlite_without_rowcount'),
     ],
 )
-def test_commit_table(catalog: SqlCatalog, table_schema_nested: Schema, 
random_identifier: Identifier) -> None:
-    database_name, _table_name = random_identifier
-    catalog.create_namespace(database_name)
-    table = catalog.create_table(random_identifier, table_schema_nested)
[email protected](
+    "table_identifier",
+    [
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+        lazy_fixture("random_table_identifier_with_catalog"),
+    ],
+)
+def test_commit_table(catalog: SqlCatalog, table_schema_nested: Schema, 
table_identifier: Identifier) -> None:
+    table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(table_identifier)
+    namespace = Catalog.namespace_from(table_identifier_nocatalog)
+    catalog.create_namespace(namespace)
+    table = catalog.create_table(table_identifier, table_schema_nested)
 
     assert catalog._parse_metadata_version(table.metadata_location) == 0
     assert table.metadata.current_schema_id == 0
@@ -878,10 +1226,19 @@ def test_commit_table(catalog: SqlCatalog, 
table_schema_nested: Schema, random_i
         lazy_fixture('catalog_sqlite_fsspec'),
     ],
 )
-def test_append_table(catalog: SqlCatalog, table_schema_simple: Schema, 
random_identifier: Identifier) -> None:
-    database_name, _table_name = random_identifier
-    catalog.create_namespace(database_name)
-    table = catalog.create_table(random_identifier, table_schema_simple)
[email protected](
+    "table_identifier",
+    [
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+        lazy_fixture("random_table_identifier_with_catalog"),
+    ],
+)
+def test_append_table(catalog: SqlCatalog, table_schema_simple: Schema, 
table_identifier: Identifier) -> None:
+    table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(table_identifier)
+    namespace = Catalog.namespace_from(table_identifier_nocatalog)
+    catalog.create_namespace(namespace)
+    table = catalog.create_table(table_identifier, table_schema_simple)
 
     df = pa.Table.from_pydict(
         {
@@ -918,11 +1275,20 @@ def test_append_table(catalog: SqlCatalog, 
table_schema_simple: Schema, random_i
         lazy_fixture('catalog_sqlite_without_rowcount'),
     ],
 )
-def test_concurrent_commit_table(catalog: SqlCatalog, table_schema_simple: 
Schema, random_identifier: Identifier) -> None:
-    database_name, _table_name = random_identifier
-    catalog.create_namespace(database_name)
-    table_a = catalog.create_table(random_identifier, table_schema_simple)
-    table_b = catalog.load_table(random_identifier)
[email protected](
+    "table_identifier",
+    [
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+        lazy_fixture("random_table_identifier_with_catalog"),
+    ],
+)
+def test_concurrent_commit_table(catalog: SqlCatalog, table_schema_simple: 
Schema, table_identifier: Identifier) -> None:
+    table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(table_identifier)
+    namespace = Catalog.namespace_from(table_identifier_nocatalog)
+    catalog.create_namespace(namespace)
+    table_a = catalog.create_table(table_identifier, table_schema_simple)
+    table_b = catalog.load_table(table_identifier)
 
     with table_a.update_schema() as update:
         update.add_column(path="b", field_type=IntegerType())
@@ -992,12 +1358,21 @@ def test_write_and_evolve(catalog: SqlCatalog, 
format_version: int) -> None:
         lazy_fixture('catalog_sqlite_without_rowcount'),
     ],
 )
-def test_table_properties_int_value(catalog: SqlCatalog, table_schema_simple: 
Schema, random_identifier: Identifier) -> None:
[email protected](
+    "table_identifier",
+    [
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+        lazy_fixture("random_table_identifier_with_catalog"),
+    ],
+)
+def test_table_properties_int_value(catalog: SqlCatalog, table_schema_simple: 
Schema, table_identifier: Identifier) -> None:
     # table properties can be set to int, but still serialized to string
-    database_name, _table_name = random_identifier
-    catalog.create_namespace(database_name)
+    table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(table_identifier)
+    namespace = Catalog.namespace_from(table_identifier_nocatalog)
+    catalog.create_namespace(namespace)
     property_with_int = {"property_name": 42}
-    table = catalog.create_table(random_identifier, table_schema_simple, 
properties=property_with_int)
+    table = catalog.create_table(table_identifier, table_schema_simple, 
properties=property_with_int)
     assert isinstance(table.properties["property_name"], str)
 
 
@@ -1009,14 +1384,23 @@ def test_table_properties_int_value(catalog: 
SqlCatalog, table_schema_simple: Sc
         lazy_fixture('catalog_sqlite_without_rowcount'),
     ],
 )
[email protected](
+    "table_identifier",
+    [
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+        lazy_fixture("random_table_identifier_with_catalog"),
+    ],
+)
 def test_table_properties_raise_for_none_value(
-    catalog: SqlCatalog, table_schema_simple: Schema, random_identifier: 
Identifier
+    catalog: SqlCatalog, table_schema_simple: Schema, table_identifier: 
Identifier
 ) -> None:
-    database_name, _table_name = random_identifier
-    catalog.create_namespace(database_name)
+    table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(table_identifier)
+    namespace = Catalog.namespace_from(table_identifier_nocatalog)
+    catalog.create_namespace(namespace)
     property_with_none = {"property_name": None}
     with pytest.raises(ValidationError) as exc_info:
-        _ = catalog.create_table(random_identifier, table_schema_simple, 
properties=property_with_none)
+        _ = catalog.create_table(table_identifier, table_schema_simple, 
properties=property_with_none)
     assert "None type is not a supported value in properties: property_name" 
in str(exc_info.value)
 
 
@@ -1027,11 +1411,20 @@ def test_table_properties_raise_for_none_value(
         lazy_fixture('catalog_sqlite'),
     ],
 )
-def test_table_exists(catalog: SqlCatalog, table_schema_simple: Schema, 
random_identifier: Identifier) -> None:
-    database_name, _table_name = random_identifier
-    catalog.create_namespace(database_name)
-    catalog.create_table(random_identifier, table_schema_simple, 
properties={"format-version": "2"})
-    existing_table = random_identifier
[email protected](
+    "table_identifier",
+    [
+        lazy_fixture("random_table_identifier"),
+        lazy_fixture("random_hierarchical_identifier"),
+        lazy_fixture("random_table_identifier_with_catalog"),
+    ],
+)
+def test_table_exists(catalog: SqlCatalog, table_schema_simple: Schema, 
table_identifier: Identifier) -> None:
+    table_identifier_nocatalog = 
catalog.identifier_to_tuple_without_catalog(table_identifier)
+    namespace = Catalog.namespace_from(table_identifier_nocatalog)
+    catalog.create_namespace(namespace)
+    catalog.create_table(table_identifier, table_schema_simple, 
properties={"format-version": "2"})
+    existing_table = table_identifier
     # Act and Assert for an existing table
     assert catalog.table_exists(existing_table) is True
 
diff --git a/tests/conftest.py b/tests/conftest.py
index 66795436..4baefafe 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -1878,6 +1878,19 @@ def database_list(database_name: str) -> List[str]:
     return [f"{database_name}_{idx}" for idx in range(NUM_TABLES)]
 
 
[email protected]()
+def hierarchical_namespace_name() -> str:
+    prefix = "my_iceberg_ns-"
+    random_tag1 = "".join(choice(string.ascii_letters) for _ in 
range(RANDOM_LENGTH))
+    random_tag2 = "".join(choice(string.ascii_letters) for _ in 
range(RANDOM_LENGTH))
+    return ".".join([prefix + random_tag1, prefix + random_tag2]).lower()
+
+
[email protected]()
+def hierarchical_namespace_list(hierarchical_namespace_name: str) -> List[str]:
+    return [f"{hierarchical_namespace_name}_{idx}" for idx in 
range(NUM_TABLES)]
+
+
 BUCKET_NAME = "test_bucket"
 TABLE_METADATA_LOCATION_REGEX = re.compile(
     r"""s3://test_bucket/my_iceberg_database-[a-z]{20}.db/


Reply via email to