This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new a57cb07  Support catalog in identifier to locate existing tables (#150)
a57cb07 is described below

commit a57cb0745efba8dd115d1089d70f4a390cd3e99a
Author: Patrick Ames <[email protected]>
AuthorDate: Sat Nov 25 12:51:11 2023 -0800

    Support catalog in identifier to locate existing tables (#150)
---
 pyiceberg/catalog/__init__.py  |  19 +-
 pyiceberg/catalog/dynamodb.py  |  11 +-
 pyiceberg/catalog/glue.py      |   9 +-
 pyiceberg/catalog/hive.py      |   9 +-
 pyiceberg/catalog/rest.py      |  33 +--
 pyiceberg/catalog/sql.py       |   9 +-
 tests/catalog/test_base.py     |  53 ++++-
 tests/catalog/test_dynamodb.py |  65 ++++++
 tests/catalog/test_glue.py     |  62 ++++++
 tests/catalog/test_hive.py     | 165 +++++++++++++-
 tests/catalog/test_rest.py     | 496 +++++++++++++----------------------------
 tests/catalog/test_sql.py      |  47 ++++
 tests/conftest.py              | 127 +++++++++++
 13 files changed, 730 insertions(+), 375 deletions(-)

diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py
index 2577a97..c83fd1c 100644
--- a/pyiceberg/catalog/__init__.py
+++ b/pyiceberg/catalog/__init__.py
@@ -536,6 +536,20 @@ class Catalog(ABC):
 
         return tuple_identifier[0], tuple_identifier[1]
 
+    def identifier_to_tuple_without_catalog(self, identifier: Union[str, 
Identifier]) -> Identifier:
+        """Convert an identifier to a tuple and drop this catalog's name from 
the first element.
+
+        Args:
+            identifier (str | Identifier): Table identifier.
+
+        Returns:
+            Identifier: a tuple of strings with this catalog's name removed
+        """
+        identifier_tuple = Catalog.identifier_to_tuple(identifier)
+        if len(identifier_tuple) >= 3 and identifier_tuple[0] == self.name:
+            identifier_tuple = identifier_tuple[1:]
+        return identifier_tuple
+
     def purge_table(self, identifier: Union[str, Identifier]) -> None:
         """Drop a table and purge all data and metadata files.
 
@@ -547,8 +561,9 @@ class Catalog(ABC):
         Raises:
             NoSuchTableError: If a table with the name does not exist, or the 
identifier is invalid.
         """
-        table = self.load_table(identifier)
-        self.drop_table(identifier)
+        identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
+        table = self.load_table(identifier_tuple)
+        self.drop_table(identifier_tuple)
         io = load_file_io(self.properties, table.metadata_location)
         metadata = table.metadata
         manifest_lists_to_delete = set()
diff --git a/pyiceberg/catalog/dynamodb.py b/pyiceberg/catalog/dynamodb.py
index 848ec03..3eee95d 100644
--- a/pyiceberg/catalog/dynamodb.py
+++ b/pyiceberg/catalog/dynamodb.py
@@ -213,7 +213,8 @@ class DynamoDbCatalog(Catalog):
         Raises:
             NoSuchTableError: If a table with the name does not exist, or the 
identifier is invalid.
         """
-        database_name, table_name = 
self.identifier_to_database_and_table(identifier, NoSuchTableError)
+        identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
+        database_name, table_name = 
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
         dynamo_table_item = 
self._get_iceberg_table_item(database_name=database_name, table_name=table_name)
         return 
self._convert_dynamo_table_item_to_iceberg_table(dynamo_table_item=dynamo_table_item)
 
@@ -226,7 +227,8 @@ class DynamoDbCatalog(Catalog):
         Raises:
             NoSuchTableError: If a table with the name does not exist, or the 
identifier is invalid.
         """
-        database_name, table_name = 
self.identifier_to_database_and_table(identifier, NoSuchTableError)
+        identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
+        database_name, table_name = 
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
 
         try:
             self._delete_dynamo_item(
@@ -256,7 +258,8 @@ class DynamoDbCatalog(Catalog):
             NoSuchPropertyException: When from table miss some required 
properties.
             NoSuchNamespaceError: When the destination namespace doesn't exist.
         """
-        from_database_name, from_table_name = 
self.identifier_to_database_and_table(from_identifier, NoSuchTableError)
+        from_identifier_tuple = 
self.identifier_to_tuple_without_catalog(from_identifier)
+        from_database_name, from_table_name = 
self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
         to_database_name, to_table_name = 
self.identifier_to_database_and_table(to_identifier)
 
         from_table_item = 
self._get_iceberg_table_item(database_name=from_database_name, 
table_name=from_table_name)
@@ -287,7 +290,7 @@ class DynamoDbCatalog(Catalog):
             raise TableAlreadyExistsError(f"Table 
{to_database_name}.{to_table_name} already exists") from e
 
         try:
-            self.drop_table(from_identifier)
+            self.drop_table(from_identifier_tuple)
         except (NoSuchTableError, GenericDynamoDbError) as e:
             log_message = f"Failed to drop old table 
{from_database_name}.{from_table_name}. "
 
diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py
index e068363..723de2f 100644
--- a/pyiceberg/catalog/glue.py
+++ b/pyiceberg/catalog/glue.py
@@ -265,7 +265,8 @@ class GlueCatalog(Catalog):
         Raises:
             NoSuchTableError: If a table with the name does not exist, or the 
identifier is invalid.
         """
-        database_name, table_name = 
self.identifier_to_database_and_table(identifier, NoSuchTableError)
+        identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
+        database_name, table_name = 
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
         try:
             load_table_response = 
self.glue.get_table(DatabaseName=database_name, Name=table_name)
         except self.glue.exceptions.EntityNotFoundException as e:
@@ -282,7 +283,8 @@ class GlueCatalog(Catalog):
         Raises:
             NoSuchTableError: If a table with the name does not exist, or the 
identifier is invalid.
         """
-        database_name, table_name = 
self.identifier_to_database_and_table(identifier, NoSuchTableError)
+        identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
+        database_name, table_name = 
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
         try:
             self.glue.delete_table(DatabaseName=database_name, Name=table_name)
         except self.glue.exceptions.EntityNotFoundException as e:
@@ -307,7 +309,8 @@ class GlueCatalog(Catalog):
             NoSuchPropertyException: When from table miss some required 
properties.
             NoSuchNamespaceError: When the destination namespace doesn't exist.
         """
-        from_database_name, from_table_name = 
self.identifier_to_database_and_table(from_identifier, NoSuchTableError)
+        from_identifier_tuple = 
self.identifier_to_tuple_without_catalog(from_identifier)
+        from_database_name, from_table_name = 
self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
         to_database_name, to_table_name = 
self.identifier_to_database_and_table(to_identifier)
         try:
             get_table_response = 
self.glue.get_table(DatabaseName=from_database_name, Name=from_table_name)
diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py
index 21f1714..ffc9c53 100644
--- a/pyiceberg/catalog/hive.py
+++ b/pyiceberg/catalog/hive.py
@@ -347,7 +347,8 @@ class HiveCatalog(Catalog):
         Raises:
             NoSuchTableError: If a table with the name does not exist, or the 
identifier is invalid.
         """
-        database_name, table_name = 
self.identifier_to_database_and_table(identifier, NoSuchTableError)
+        identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
+        database_name, table_name = 
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
         try:
             with self._client as open_client:
                 hive_table = open_client.get_table(dbname=database_name, 
tbl_name=table_name)
@@ -366,7 +367,8 @@ class HiveCatalog(Catalog):
         Raises:
             NoSuchTableError: If a table with the name does not exist, or the 
identifier is invalid.
         """
-        database_name, table_name = 
self.identifier_to_database_and_table(identifier, NoSuchTableError)
+        identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
+        database_name, table_name = 
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
         try:
             with self._client as open_client:
                 open_client.drop_table(dbname=database_name, name=table_name, 
deleteData=False)
@@ -393,7 +395,8 @@ class HiveCatalog(Catalog):
             NoSuchTableError: When a table with the name does not exist.
             NoSuchNamespaceError: When the destination namespace doesn't exist.
         """
-        from_database_name, from_table_name = 
self.identifier_to_database_and_table(from_identifier, NoSuchTableError)
+        from_identifier_tuple = 
self.identifier_to_tuple_without_catalog(from_identifier)
+        from_database_name, from_table_name = 
self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
         to_database_name, to_table_name = 
self.identifier_to_database_and_table(to_identifier)
         try:
             with self._client as open_client:
diff --git a/pyiceberg/catalog/rest.py b/pyiceberg/catalog/rest.py
index 77025b5..3dbb5b7 100644
--- a/pyiceberg/catalog/rest.py
+++ b/pyiceberg/catalog/rest.py
@@ -302,19 +302,20 @@ class RestCatalog(Catalog):
         # Update URI based on overrides
         self.uri = config[URI]
 
-    def _split_identifier_for_path(self, identifier: Union[str, Identifier, 
TableIdentifier]) -> Properties:
-        if isinstance(identifier, TableIdentifier):
-            return {"namespace": 
NAMESPACE_SEPARATOR.join(identifier.namespace.root[1:]), "table": 
identifier.name}
-
+    def _identifier_to_validated_tuple(self, identifier: Union[str, 
Identifier]) -> Identifier:
         identifier_tuple = self.identifier_to_tuple(identifier)
         if len(identifier_tuple) <= 1:
             raise NoSuchTableError(f"Missing namespace or invalid identifier: 
{'.'.join(identifier_tuple)}")
+        return identifier_tuple
+
+    def _split_identifier_for_path(self, identifier: Union[str, Identifier, 
TableIdentifier]) -> Properties:
+        if isinstance(identifier, TableIdentifier):
+            return {"namespace": 
NAMESPACE_SEPARATOR.join(identifier.namespace.root[1:]), "table": 
identifier.name}
+        identifier_tuple = self._identifier_to_validated_tuple(identifier)
         return {"namespace": NAMESPACE_SEPARATOR.join(identifier_tuple[:-1]), 
"table": identifier_tuple[-1]}
 
     def _split_identifier_for_json(self, identifier: Union[str, Identifier]) 
-> Dict[str, Union[Identifier, str]]:
-        identifier_tuple = self.identifier_to_tuple(identifier)
-        if len(identifier_tuple) <= 1:
-            raise NoSuchTableError(f"Missing namespace or invalid identifier: 
{identifier_tuple}")
+        identifier_tuple = self._identifier_to_validated_tuple(identifier)
         return {"namespace": identifier_tuple[:-1], "name": 
identifier_tuple[-1]}
 
     def _handle_non_200_response(self, exc: HTTPError, error_handler: 
Dict[int, Type[Exception]]) -> None:
@@ -499,12 +500,10 @@ class RestCatalog(Catalog):
         return [(*table.namespace, table.name) for table in 
ListTablesResponse(**response.json()).identifiers]
 
     def load_table(self, identifier: Union[str, Identifier]) -> Table:
-        identifier_tuple = self.identifier_to_tuple(identifier)
-
-        if len(identifier_tuple) <= 1:
-            raise NoSuchTableError(f"Missing namespace or invalid identifier: 
{identifier}")
-
-        response = self._session.get(self.url(Endpoints.load_table, 
prefixed=True, **self._split_identifier_for_path(identifier)))
+        identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
+        response = self._session.get(
+            self.url(Endpoints.load_table, prefixed=True, 
**self._split_identifier_for_path(identifier_tuple))
+        )
         try:
             response.raise_for_status()
         except HTTPError as exc:
@@ -514,8 +513,11 @@ class RestCatalog(Catalog):
         return self._response_to_table(identifier_tuple, table_response)
 
     def drop_table(self, identifier: Union[str, Identifier], purge_requested: 
bool = False) -> None:
+        identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
         response = self._session.delete(
-            self.url(Endpoints.drop_table, prefixed=True, 
purge=purge_requested, **self._split_identifier_for_path(identifier)),
+            self.url(
+                Endpoints.drop_table, prefixed=True, purge=purge_requested, 
**self._split_identifier_for_path(identifier_tuple)
+            ),
         )
         try:
             response.raise_for_status()
@@ -526,8 +528,9 @@ class RestCatalog(Catalog):
         self.drop_table(identifier=identifier, purge_requested=True)
 
     def rename_table(self, from_identifier: Union[str, Identifier], 
to_identifier: Union[str, Identifier]) -> Table:
+        from_identifier_tuple = 
self.identifier_to_tuple_without_catalog(from_identifier)
         payload = {
-            "source": self._split_identifier_for_json(from_identifier),
+            "source": self._split_identifier_for_json(from_identifier_tuple),
             "destination": self._split_identifier_for_json(to_identifier),
         }
         response = self._session.post(self.url(Endpoints.rename_table), 
json=payload)
diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py
index bca0fe4..b0c01eb 100644
--- a/pyiceberg/catalog/sql.py
+++ b/pyiceberg/catalog/sql.py
@@ -231,7 +231,8 @@ class SqlCatalog(Catalog):
         Raises:
             NoSuchTableError: If a table with the name does not exist.
         """
-        database_name, table_name = 
self.identifier_to_database_and_table(identifier, NoSuchTableError)
+        identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
+        database_name, table_name = 
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
         with Session(self.engine) as session:
             stmt = select(IcebergTables).where(
                 IcebergTables.catalog_name == self.name,
@@ -252,7 +253,8 @@ class SqlCatalog(Catalog):
         Raises:
             NoSuchTableError: If a table with the name does not exist.
         """
-        database_name, table_name = 
self.identifier_to_database_and_table(identifier, NoSuchTableError)
+        identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
+        database_name, table_name = 
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
         with Session(self.engine) as session:
             res = session.execute(
                 delete(IcebergTables).where(
@@ -280,7 +282,8 @@ class SqlCatalog(Catalog):
             TableAlreadyExistsError: If a table with the new name already 
exist.
             NoSuchNamespaceError: If the target namespace does not exist.
         """
-        from_database_name, from_table_name = 
self.identifier_to_database_and_table(from_identifier, NoSuchTableError)
+        from_identifier_tuple = 
self.identifier_to_tuple_without_catalog(from_identifier)
+        from_database_name, from_table_name = 
self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
         to_database_name, to_table_name = 
self.identifier_to_database_and_table(to_identifier)
         if not self._namespace_exists(to_database_name):
             raise NoSuchNamespaceError(f"Namespace does not exist: 
{to_database_name}")
diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py
index 1078dd1..c7cd1c4 100644
--- a/tests/catalog/test_base.py
+++ b/tests/catalog/test_base.py
@@ -149,14 +149,14 @@ class InMemoryCatalog(Catalog):
         )
 
     def load_table(self, identifier: Union[str, Identifier]) -> Table:
-        identifier = Catalog.identifier_to_tuple(identifier)
+        identifier = self.identifier_to_tuple_without_catalog(identifier)
         try:
             return self.__tables[identifier]
         except KeyError as error:
             raise NoSuchTableError(f"Table does not exist: {identifier}") from 
error
 
     def drop_table(self, identifier: Union[str, Identifier]) -> None:
-        identifier = Catalog.identifier_to_tuple(identifier)
+        identifier = self.identifier_to_tuple_without_catalog(identifier)
         try:
             self.__tables.pop(identifier)
         except KeyError as error:
@@ -166,7 +166,7 @@ class InMemoryCatalog(Catalog):
         self.drop_table(identifier)
 
     def rename_table(self, from_identifier: Union[str, Identifier], 
to_identifier: Union[str, Identifier]) -> Table:
-        from_identifier = Catalog.identifier_to_tuple(from_identifier)
+        from_identifier = 
self.identifier_to_tuple_without_catalog(from_identifier)
         try:
             table = self.__tables.pop(from_identifier)
         except KeyError as error:
@@ -352,6 +352,16 @@ def test_load_table(catalog: InMemoryCatalog) -> None:
     assert table == given_table
 
 
+def test_load_table_from_self_identifier(catalog: InMemoryCatalog) -> None:
+    # Given
+    given_table = given_catalog_has_a_table(catalog)
+    # When
+    intermediate = catalog.load_table(TEST_TABLE_IDENTIFIER)
+    table = catalog.load_table(intermediate.identifier)
+    # Then
+    assert table == given_table
+
+
 def test_table_raises_error_on_table_not_found(catalog: InMemoryCatalog) -> 
None:
     with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
         catalog.load_table(TEST_TABLE_IDENTIFIER)
@@ -367,6 +377,18 @@ def test_drop_table(catalog: InMemoryCatalog) -> None:
         catalog.load_table(TEST_TABLE_IDENTIFIER)
 
 
+def test_drop_table_from_self_identifier(catalog: InMemoryCatalog) -> None:
+    # Given
+    table = given_catalog_has_a_table(catalog)
+    # When
+    catalog.drop_table(table.identifier)
+    # Then
+    with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
+        catalog.load_table(table.identifier)
+    with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
+        catalog.load_table(TEST_TABLE_IDENTIFIER)
+
+
 def test_drop_table_that_does_not_exist_raise_error(catalog: InMemoryCatalog) 
-> None:
     with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
         catalog.load_table(TEST_TABLE_IDENTIFIER)
@@ -405,6 +427,31 @@ def test_rename_table(catalog: InMemoryCatalog) -> None:
         catalog.load_table(TEST_TABLE_IDENTIFIER)
 
 
+def test_rename_table_from_self_identifier(catalog: InMemoryCatalog) -> None:
+    # Given
+    table = given_catalog_has_a_table(catalog)
+
+    # When
+    new_table_name = "new.namespace.new_table"
+    new_table = catalog.rename_table(table.identifier, new_table_name)
+
+    # Then
+    assert new_table.identifier == Catalog.identifier_to_tuple(new_table_name)
+
+    # And
+    new_table = catalog.load_table(new_table.identifier)
+    assert new_table.identifier == Catalog.identifier_to_tuple(new_table_name)
+
+    # And
+    assert ("new", "namespace") in catalog.list_namespaces()
+
+    # And
+    with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
+        catalog.load_table(table.identifier)
+    with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
+        catalog.load_table(TEST_TABLE_IDENTIFIER)
+
+
 def test_create_namespace(catalog: InMemoryCatalog) -> None:
     # When
     catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES)
diff --git a/tests/catalog/test_dynamodb.py b/tests/catalog/test_dynamodb.py
index 582cb03..f03d1d9 100644
--- a/tests/catalog/test_dynamodb.py
+++ b/tests/catalog/test_dynamodb.py
@@ -175,6 +175,23 @@ def test_load_table(
     assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
 
 
+@mock_dynamodb
+def test_load_table_from_self_identifier(
+    _bucket_initialize: None, _patch_aiobotocore: None, table_schema_nested: 
Schema, database_name: str, table_name: str
+) -> None:
+    catalog_name = "test_ddb_catalog"
+    identifier = (database_name, table_name)
+    test_catalog = DynamoDbCatalog(
+        catalog_name, **{"warehouse": f"s3://{BUCKET_NAME}", "py-io-impl": 
"pyiceberg.io.fsspec.FsspecFileIO"}
+    )
+    test_catalog.create_namespace(namespace=database_name)
+    test_catalog.create_table(identifier, table_schema_nested)
+    intermediate = test_catalog.load_table(identifier)
+    table = test_catalog.load_table(intermediate.identifier)
+    assert table.identifier == (catalog_name,) + identifier
+    assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+
+
 @mock_dynamodb
 def test_load_non_exist_table(_bucket_initialize: None, _patch_aiobotocore: 
None, database_name: str, table_name: str) -> None:
     identifier = (database_name, table_name)
@@ -203,6 +220,27 @@ def test_drop_table(
         test_catalog.load_table(identifier)
 
 
+@mock_dynamodb
+def test_drop_table_from_self_identifier(
+    _bucket_initialize: None, _patch_aiobotocore: None, table_schema_nested: 
Schema, database_name: str, table_name: str
+) -> None:
+    catalog_name = "test_ddb_catalog"
+    identifier = (database_name, table_name)
+    test_catalog = DynamoDbCatalog(
+        catalog_name, **{"warehouse": f"s3://{BUCKET_NAME}", "py-io-impl": 
"pyiceberg.io.fsspec.FsspecFileIO"}
+    )
+    test_catalog.create_namespace(namespace=database_name)
+    test_catalog.create_table(identifier, table_schema_nested)
+    table = test_catalog.load_table(identifier)
+    assert table.identifier == (catalog_name,) + identifier
+    assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+    test_catalog.drop_table(table.identifier)
+    with pytest.raises(NoSuchTableError):
+        test_catalog.load_table(identifier)
+    with pytest.raises(NoSuchTableError):
+        test_catalog.load_table(table.identifier)
+
+
 @mock_dynamodb
 def test_drop_non_exist_table(_bucket_initialize: None, _patch_aiobotocore: 
None, database_name: str, table_name: str) -> None:
     identifier = (database_name, table_name)
@@ -236,6 +274,33 @@ def test_rename_table(
         test_catalog.load_table(identifier)
 
 
+@mock_dynamodb
+def test_rename_table_from_self_identifier(
+    _bucket_initialize: None, _patch_aiobotocore: None, table_schema_nested: 
Schema, database_name: str, table_name: str
+) -> None:
+    catalog_name = "test_ddb_catalog"
+    new_table_name = f"{table_name}_new"
+    identifier = (database_name, table_name)
+    new_identifier = (database_name, new_table_name)
+    test_catalog = DynamoDbCatalog(
+        catalog_name, **{"warehouse": f"s3://{BUCKET_NAME}", "py-io-impl": 
"pyiceberg.io.fsspec.FsspecFileIO"}
+    )
+    test_catalog.create_namespace(namespace=database_name)
+    table = test_catalog.create_table(identifier, table_schema_nested)
+    assert table.identifier == (catalog_name,) + identifier
+    assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+    test_catalog.rename_table(table.identifier, new_identifier)
+    new_table = test_catalog.load_table(new_identifier)
+    assert new_table.identifier == (catalog_name,) + new_identifier
+    # the metadata_location should not change
+    assert new_table.metadata_location == table.metadata_location
+    # old table should be dropped
+    with pytest.raises(NoSuchTableError):
+        test_catalog.load_table(identifier)
+    with pytest.raises(NoSuchTableError):
+        test_catalog.load_table(table.identifier)
+
+
 @mock_dynamodb
 def test_fail_on_rename_table_with_missing_required_params(
     _bucket_initialize: None, _patch_aiobotocore: None, database_name: str, 
table_name: str
diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py
index 1d7027a..f182bf1 100644
--- a/tests/catalog/test_glue.py
+++ b/tests/catalog/test_glue.py
@@ -153,6 +153,22 @@ def test_load_table(
     assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
 
 
+@mock_glue
+def test_load_table_from_self_identifier(
+    _bucket_initialize: None, _patch_aiobotocore: None, table_schema_nested: 
Schema, database_name: str, table_name: str
+) -> None:
+    catalog_name = "glue"
+    identifier = (database_name, table_name)
+    test_catalog = GlueCatalog(
+        catalog_name, **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO", 
"warehouse": f"s3://{BUCKET_NAME}/"}
+    )
+    test_catalog.create_namespace(namespace=database_name)
+    intermediate = test_catalog.create_table(identifier, table_schema_nested)
+    table = test_catalog.load_table(intermediate.identifier)
+    assert table.identifier == (catalog_name,) + identifier
+    assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+
+
 @mock_glue
 def test_load_non_exist_table(_bucket_initialize: None, _patch_aiobotocore: 
None, database_name: str, table_name: str) -> None:
     identifier = (database_name, table_name)
@@ -181,6 +197,27 @@ def test_drop_table(
         test_catalog.load_table(identifier)
 
 
+@mock_glue
+def test_drop_table_from_self_identifier(
+    _bucket_initialize: None, _patch_aiobotocore: None, table_schema_nested: 
Schema, database_name: str, table_name: str
+) -> None:
+    catalog_name = "glue"
+    identifier = (database_name, table_name)
+    test_catalog = GlueCatalog(
+        catalog_name, **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO", 
"warehouse": f"s3://{BUCKET_NAME}/"}
+    )
+    test_catalog.create_namespace(namespace=database_name)
+    test_catalog.create_table(identifier, table_schema_nested)
+    table = test_catalog.load_table(identifier)
+    assert table.identifier == (catalog_name,) + identifier
+    assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+    test_catalog.drop_table(table.identifier)
+    with pytest.raises(NoSuchTableError):
+        test_catalog.load_table(identifier)
+    with pytest.raises(NoSuchTableError):
+        test_catalog.load_table(table.identifier)
+
+
 @mock_glue
 def test_drop_non_exist_table(_bucket_initialize: None, _patch_aiobotocore: 
None, database_name: str, table_name: str) -> None:
     identifier = (database_name, table_name)
@@ -212,6 +249,31 @@ def test_rename_table(
         test_catalog.load_table(identifier)
 
 
+@mock_glue
+def test_rename_table_from_self_identifier(
+    _bucket_initialize: None, _patch_aiobotocore: None, table_schema_nested: 
Schema, database_name: str, table_name: str
+) -> None:
+    catalog_name = "glue"
+    new_table_name = f"{table_name}_new"
+    identifier = (database_name, table_name)
+    new_identifier = (database_name, new_table_name)
+    test_catalog = GlueCatalog("glue", **{"py-io-impl": 
"pyiceberg.io.fsspec.FsspecFileIO", "warehouse": f"s3://{BUCKET_NAME}/"})
+    test_catalog.create_namespace(namespace=database_name)
+    table = test_catalog.create_table(identifier, table_schema_nested)
+    assert table.identifier == (catalog_name,) + identifier
+    assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+    test_catalog.rename_table(table.identifier, new_identifier)
+    new_table = test_catalog.load_table(new_identifier)
+    assert new_table.identifier == (catalog_name,) + new_identifier
+    # the metadata_location should not change
+    assert new_table.metadata_location == table.metadata_location
+    # old table should be dropped
+    with pytest.raises(NoSuchTableError):
+        test_catalog.load_table(identifier)
+    with pytest.raises(NoSuchTableError):
+        test_catalog.load_table(table.identifier)
+
+
 @mock_glue
 def test_rename_table_no_params(_glue, _bucket_initialize: None, 
_patch_aiobotocore: None, database_name: str, table_name: str) -> None:  # 
type: ignore
     new_database_name = f"{database_name}_new"
diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py
index c280146..5433678 100644
--- a/tests/catalog/test_hive.py
+++ b/tests/catalog/test_hive.py
@@ -15,8 +15,9 @@
 #  specific language governing permissions and limitations
 #  under the License.
 # pylint: disable=protected-access,redefined-outer-name
+import copy
 import uuid
-from unittest.mock import MagicMock, patch
+from unittest.mock import MagicMock, call, patch
 
 import pytest
 from hive_metastore.ttypes import (
@@ -394,6 +395,155 @@ def test_load_table(hive_table: HiveTable) -> None:
     assert expected == table.metadata
 
 
+def test_load_table_from_self_identifier(hive_table: HiveTable) -> None:
+    catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)
+
+    catalog._client = MagicMock()
+    catalog._client.__enter__().get_table.return_value = hive_table
+    intermediate = catalog.load_table(("default", "new_tabl2e"))
+    table = catalog.load_table(intermediate.identifier)
+
+    catalog._client.__enter__().get_table.assert_called_with(dbname="default", 
tbl_name="new_tabl2e")
+
+    expected = TableMetadataV2(
+        location="s3://bucket/test/location",
+        table_uuid=uuid.UUID("9c12d441-03fe-4693-9a96-a0705ddf69c1"),
+        last_updated_ms=1602638573590,
+        last_column_id=3,
+        schemas=[
+            Schema(
+                NestedField(field_id=1, name="x", field_type=LongType(), 
required=True),
+                schema_id=0,
+                identifier_field_ids=[],
+            ),
+            Schema(
+                NestedField(field_id=1, name="x", field_type=LongType(), 
required=True),
+                NestedField(field_id=2, name="y", field_type=LongType(), 
required=True, doc="comment"),
+                NestedField(field_id=3, name="z", field_type=LongType(), 
required=True),
+                schema_id=1,
+                identifier_field_ids=[1, 2],
+            ),
+        ],
+        current_schema_id=1,
+        partition_specs=[
+            PartitionSpec(PartitionField(source_id=1, field_id=1000, 
transform=IdentityTransform(), name="x"), spec_id=0)
+        ],
+        default_spec_id=0,
+        last_partition_id=1000,
+        properties={"read.split.target.size": "134217728"},
+        current_snapshot_id=3055729675574597004,
+        snapshots=[
+            Snapshot(
+                snapshot_id=3051729675574597004,
+                parent_snapshot_id=None,
+                sequence_number=0,
+                timestamp_ms=1515100955770,
+                manifest_list="s3://a/b/1.avro",
+                summary=Summary(operation=Operation.APPEND),
+                schema_id=None,
+            ),
+            Snapshot(
+                snapshot_id=3055729675574597004,
+                parent_snapshot_id=3051729675574597004,
+                sequence_number=1,
+                timestamp_ms=1555100955770,
+                manifest_list="s3://a/b/2.avro",
+                summary=Summary(operation=Operation.APPEND),
+                schema_id=1,
+            ),
+        ],
+        snapshot_log=[
+            SnapshotLogEntry(snapshot_id=3051729675574597004, 
timestamp_ms=1515100955770),
+            SnapshotLogEntry(snapshot_id=3055729675574597004, 
timestamp_ms=1555100955770),
+        ],
+        
metadata_log=[MetadataLogEntry(metadata_file="s3://bucket/.../v1.json", 
timestamp_ms=1515100)],
+        sort_orders=[
+            SortOrder(
+                SortField(
+                    source_id=2, transform=IdentityTransform(), 
direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST
+                ),
+                SortField(
+                    source_id=3,
+                    transform=BucketTransform(num_buckets=4),
+                    direction=SortDirection.DESC,
+                    null_order=NullOrder.NULLS_LAST,
+                ),
+                order_id=3,
+            )
+        ],
+        default_sort_order_id=3,
+        refs={
+            "test": SnapshotRef(
+                snapshot_id=3051729675574597004,
+                snapshot_ref_type=SnapshotRefType.TAG,
+                min_snapshots_to_keep=None,
+                max_snapshot_age_ms=None,
+                max_ref_age_ms=10000000,
+            ),
+            "main": SnapshotRef(
+                snapshot_id=3055729675574597004,
+                snapshot_ref_type=SnapshotRefType.BRANCH,
+                min_snapshots_to_keep=None,
+                max_snapshot_age_ms=None,
+                max_ref_age_ms=None,
+            ),
+        },
+        format_version=2,
+        last_sequence_number=34,
+    )
+
+    assert table.identifier == (HIVE_CATALOG_NAME, "default", "new_tabl2e")
+    assert expected == table.metadata
+
+
+def test_rename_table(hive_table: HiveTable) -> None:
+    catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)
+
+    renamed_table = copy.deepcopy(hive_table)
+    renamed_table.dbName = "default"
+    renamed_table.tableName = "new_tabl3e"
+
+    catalog._client = MagicMock()
+    catalog._client.__enter__().get_table.side_effect = [hive_table, 
renamed_table]
+    catalog._client.__enter__().alter_table.return_value = None
+
+    from_identifier = ("default", "new_tabl2e")
+    to_identifier = ("default", "new_tabl3e")
+    table = catalog.rename_table(from_identifier, to_identifier)
+
+    assert table.identifier == ("hive",) + to_identifier
+
+    calls = [call(dbname="default", tbl_name="new_tabl2e"), 
call(dbname="default", tbl_name="new_tabl3e")]
+    catalog._client.__enter__().get_table.assert_has_calls(calls)
+    
catalog._client.__enter__().alter_table.assert_called_with(dbname="default", 
tbl_name="new_tabl2e", new_tbl=renamed_table)
+
+
+def test_rename_table_from_self_identifier(hive_table: HiveTable) -> None:
+    catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)
+
+    catalog._client = MagicMock()
+    catalog._client.__enter__().get_table.return_value = hive_table
+
+    from_identifier = ("default", "new_tabl2e")
+    from_table = catalog.load_table(from_identifier)
+    catalog._client.__enter__().get_table.assert_called_with(dbname="default", 
tbl_name="new_tabl2e")
+
+    renamed_table = copy.deepcopy(hive_table)
+    renamed_table.dbName = "default"
+    renamed_table.tableName = "new_tabl3e"
+
+    catalog._client.__enter__().get_table.side_effect = [hive_table, 
renamed_table]
+    catalog._client.__enter__().alter_table.return_value = None
+    to_identifier = ("default", "new_tabl3e")
+    table = catalog.rename_table(from_table.identifier, to_identifier)
+
+    assert table.identifier == ("hive",) + to_identifier
+
+    calls = [call(dbname="default", tbl_name="new_tabl2e"), 
call(dbname="default", tbl_name="new_tabl3e")]
+    catalog._client.__enter__().get_table.assert_has_calls(calls)
+    
catalog._client.__enter__().alter_table.assert_called_with(dbname="default", 
tbl_name="new_tabl2e", new_tbl=renamed_table)
+
+
 def test_rename_table_from_does_not_exists() -> None:
     catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)
 
@@ -489,6 +639,19 @@ def test_drop_table() -> None:
     
catalog._client.__enter__().drop_table.assert_called_with(dbname="default", 
name="table", deleteData=False)
 
 
+def test_drop_table_from_self_identifier(hive_table: HiveTable) -> None:
+    catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)
+
+    catalog._client = MagicMock()
+    catalog._client.__enter__().get_table.return_value = hive_table
+    table = catalog.load_table(("default", "new_tabl2e"))
+
+    catalog._client.__enter__().get_all_databases.return_value = 
["namespace1", "namespace2"]
+    catalog.drop_table(table.identifier)
+
+    
catalog._client.__enter__().drop_table.assert_called_with(dbname="default", 
name="new_tabl2e", deleteData=False)
+
+
 def test_drop_table_does_not_exists() -> None:
     catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)
 
diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py
index 43313c0..79cf25f 100644
--- a/tests/catalog/test_rest.py
+++ b/tests/catalog/test_rest.py
@@ -16,9 +16,8 @@
 #  under the License.
 # pylint: disable=redefined-outer-name,unused-argument
 import os
-from typing import cast
+from typing import Any, Dict, cast
 from unittest import mock
-from uuid import UUID
 
 import pytest
 from requests_mock import Mocker
@@ -37,17 +36,9 @@ from pyiceberg.io import load_file_io
 from pyiceberg.partitioning import PartitionField, PartitionSpec
 from pyiceberg.schema import Schema
 from pyiceberg.table.metadata import TableMetadataV1
-from pyiceberg.table.refs import SnapshotRef, SnapshotRefType
-from pyiceberg.table.snapshots import Operation, Snapshot, Summary
 from pyiceberg.table.sorting import SortField, SortOrder
 from pyiceberg.transforms import IdentityTransform, TruncateTransform
 from pyiceberg.typedef import RecursiveDict
-from pyiceberg.types import (
-    BooleanType,
-    IntegerType,
-    NestedField,
-    StringType,
-)
 from pyiceberg.utils.config import Config
 
 TEST_URI = "https://iceberg-test-catalog/";
@@ -64,6 +55,30 @@ OAUTH_TEST_HEADERS = {
 }
 
 
[email protected]
+def 
example_table_metadata_with_snapshot_v1_rest_json(example_table_metadata_with_snapshot_v1:
 Dict[str, Any]) -> Dict[str, Any]:
+    return {
+        "metadata-location": 
"s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
+        "metadata": example_table_metadata_with_snapshot_v1,
+        "config": {
+            "client.factory": 
"io.tabular.iceberg.catalog.TabularAwsClientFactory",
+            "region": "us-west-2",
+        },
+    }
+
+
[email protected]
+def 
example_table_metadata_no_snapshot_v1_rest_json(example_table_metadata_no_snapshot_v1:
 Dict[str, Any]) -> Dict[str, Any]:
+    return {
+        "metadata-location": "s3://warehouse/database/table/metadata.json",
+        "metadata": example_table_metadata_no_snapshot_v1,
+        "config": {
+            "client.factory": 
"io.tabular.iceberg.catalog.TabularAwsClientFactory",
+            "region": "us-west-2",
+        },
+    }
+
+
 @pytest.fixture
 def rest_mock(requests_mock: Mocker) -> Mocker:
     """Takes the default requests_mock and adds the config endpoint to it
@@ -339,77 +354,10 @@ def test_update_namespace_properties_404(rest_mock: 
Mocker) -> None:
     assert "Namespace does not exist" in str(e.value)
 
 
-def test_load_table_200(rest_mock: Mocker) -> None:
+def test_load_table_200(rest_mock: Mocker, 
example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any]) -> None:
     rest_mock.get(
         f"{TEST_URI}v1/namespaces/fokko/tables/table",
-        json={
-            "metadata-location": 
"s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
-            "metadata": {
-                "format-version": 1,
-                "table-uuid": "b55d9dda-6561-423a-8bfc-787980ce421f",
-                "location": "s3://warehouse/database/table",
-                "last-updated-ms": 1646787054459,
-                "last-column-id": 2,
-                "schema": {
-                    "type": "struct",
-                    "schema-id": 0,
-                    "fields": [
-                        {"id": 1, "name": "id", "required": False, "type": 
"int"},
-                        {"id": 2, "name": "data", "required": False, "type": 
"string"},
-                    ],
-                },
-                "current-schema-id": 0,
-                "schemas": [
-                    {
-                        "type": "struct",
-                        "schema-id": 0,
-                        "fields": [
-                            {"id": 1, "name": "id", "required": False, "type": 
"int"},
-                            {"id": 2, "name": "data", "required": False, 
"type": "string"},
-                        ],
-                    }
-                ],
-                "partition-spec": [],
-                "default-spec-id": 0,
-                "partition-specs": [{"spec-id": 0, "fields": []}],
-                "last-partition-id": 999,
-                "default-sort-order-id": 0,
-                "sort-orders": [{"order-id": 0, "fields": []}],
-                "properties": {"owner": "bryan", 
"write.metadata.compression-codec": "gzip"},
-                "current-snapshot-id": 3497810964824022504,
-                "refs": {"main": {"snapshot-id": 3497810964824022504, "type": 
"branch"}},
-                "snapshots": [
-                    {
-                        "snapshot-id": 3497810964824022504,
-                        "timestamp-ms": 1646787054459,
-                        "summary": {
-                            "operation": "append",
-                            "spark.app.id": "local-1646787004168",
-                            "added-data-files": "1",
-                            "added-records": "1",
-                            "added-files-size": "697",
-                            "changed-partition-count": "1",
-                            "total-records": "1",
-                            "total-files-size": "697",
-                            "total-data-files": "1",
-                            "total-delete-files": "0",
-                            "total-position-deletes": "0",
-                            "total-equality-deletes": "0",
-                        },
-                        "manifest-list": 
"s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro",
-                        "schema-id": 0,
-                    }
-                ],
-                "snapshot-log": [{"timestamp-ms": 1646787054459, 
"snapshot-id": 3497810964824022504}],
-                "metadata-log": [
-                    {
-                        "timestamp-ms": 1646787031514,
-                        "metadata-file": 
"s3://warehouse/database/table/metadata/00000-88484a1c-00e5-4a07-a787-c0e7aeffa805.gz.metadata.json",
-                    }
-                ],
-            },
-            "config": {"client.factory": 
"io.tabular.iceberg.catalog.TabularAwsClientFactory", "region": "us-west-2"},
-        },
+        json=example_table_metadata_with_snapshot_v1_rest_json,
         status_code=200,
         request_headers=TEST_HEADERS,
     )
@@ -417,78 +365,8 @@ def test_load_table_200(rest_mock: Mocker) -> None:
     actual = catalog.load_table(("fokko", "table"))
     expected = Table(
         identifier=("rest", "fokko", "table"),
-        
metadata_location="s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
-        metadata=TableMetadataV1(
-            location="s3://warehouse/database/table",
-            table_uuid=UUID("b55d9dda-6561-423a-8bfc-787980ce421f"),
-            last_updated_ms=1646787054459,
-            last_column_id=2,
-            schemas=[
-                Schema(
-                    NestedField(field_id=1, name="id", 
field_type=IntegerType(), required=False),
-                    NestedField(field_id=2, name="data", 
field_type=StringType(), required=False),
-                    schema_id=0,
-                    identifier_field_ids=[],
-                )
-            ],
-            current_schema_id=0,
-            default_spec_id=0,
-            last_partition_id=999,
-            properties={"owner": "bryan", "write.metadata.compression-codec": 
"gzip"},
-            current_snapshot_id=3497810964824022504,
-            snapshots=[
-                Snapshot(
-                    snapshot_id=3497810964824022504,
-                    parent_snapshot_id=None,
-                    sequence_number=None,
-                    timestamp_ms=1646787054459,
-                    
manifest_list="s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro",
-                    summary=Summary(
-                        operation=Operation.APPEND,
-                        **{
-                            "spark.app.id": "local-1646787004168",
-                            "added-data-files": "1",
-                            "added-records": "1",
-                            "added-files-size": "697",
-                            "changed-partition-count": "1",
-                            "total-records": "1",
-                            "total-files-size": "697",
-                            "total-data-files": "1",
-                            "total-delete-files": "0",
-                            "total-position-deletes": "0",
-                            "total-equality-deletes": "0",
-                        },
-                    ),
-                    schema_id=0,
-                )
-            ],
-            snapshot_log=[{"timestamp-ms": 1646787054459, "snapshot-id": 
3497810964824022504}],
-            metadata_log=[
-                {
-                    "timestamp-ms": 1646787031514,
-                    "metadata-file": 
"s3://warehouse/database/table/metadata/00000-88484a1c-00e5-4a07-a787-c0e7aeffa805.gz.metadata.json",
-                }
-            ],
-            sort_orders=[SortOrder(order_id=0)],
-            default_sort_order_id=0,
-            refs={
-                "main": SnapshotRef(
-                    snapshot_id=3497810964824022504,
-                    snapshot_ref_type=SnapshotRefType.BRANCH,
-                    min_snapshots_to_keep=None,
-                    max_snapshot_age_ms=None,
-                    max_ref_age_ms=None,
-                )
-            },
-            format_version=1,
-            schema_=Schema(
-                NestedField(field_id=1, name="id", field_type=IntegerType(), 
required=False),
-                NestedField(field_id=2, name="data", field_type=StringType(), 
required=False),
-                schema_id=0,
-                identifier_field_ids=[],
-            ),
-            partition_spec=[],
-        ),
+        
metadata_location=example_table_metadata_with_snapshot_v1_rest_json["metadata-location"],
+        
metadata=TableMetadataV1(**example_table_metadata_with_snapshot_v1_rest_json["metadata"]),
         io=load_file_io(),
         catalog=catalog,
     )
@@ -497,6 +375,29 @@ def test_load_table_200(rest_mock: Mocker) -> None:
     assert actual == expected
 
 
+def test_load_table_from_self_identifier_200(
+    rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: 
Dict[str, Any]
+) -> None:
+    rest_mock.get(
+        f"{TEST_URI}v1/namespaces/pdames/tables/table",
+        json=example_table_metadata_with_snapshot_v1_rest_json,
+        status_code=200,
+        request_headers=TEST_HEADERS,
+    )
+    catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
+    table = catalog.load_table(("pdames", "table"))
+    actual = catalog.load_table(table.identifier)
+    expected = Table(
+        identifier=("rest", "pdames", "table"),
+        
metadata_location=example_table_metadata_with_snapshot_v1_rest_json["metadata-location"],
+        
metadata=TableMetadataV1(**example_table_metadata_with_snapshot_v1_rest_json["metadata"]),
+        io=load_file_io(),
+        catalog=catalog,
+    )
+    assert actual.metadata.model_dump() == expected.metadata.model_dump()
+    assert actual == expected
+
+
 def test_load_table_404(rest_mock: Mocker) -> None:
     rest_mock.get(
         f"{TEST_URI}v1/namespaces/fokko/tables/does_not_exists",
@@ -535,62 +436,12 @@ def test_drop_table_404(rest_mock: Mocker) -> None:
     assert "Table does not exist" in str(e.value)
 
 
-def test_create_table_200(rest_mock: Mocker, table_schema_simple: Schema) -> 
None:
+def test_create_table_200(
+    rest_mock: Mocker, table_schema_simple: Schema, 
example_table_metadata_no_snapshot_v1_rest_json: Dict[str, Any]
+) -> None:
     rest_mock.post(
         f"{TEST_URI}v1/namespaces/fokko/tables",
-        json={
-            "metadata-location": "s3://warehouse/database/table/metadata.json",
-            "metadata": {
-                "format-version": 1,
-                "table-uuid": "bf289591-dcc0-4234-ad4f-5c3eed811a29",
-                "location": "s3://warehouse/database/table",
-                "last-updated-ms": 1657810967051,
-                "last-column-id": 3,
-                "schema": {
-                    "type": "struct",
-                    "schema-id": 0,
-                    "identifier-field-ids": [2],
-                    "fields": [
-                        {"id": 1, "name": "foo", "required": False, "type": 
"string"},
-                        {"id": 2, "name": "bar", "required": True, "type": 
"int"},
-                        {"id": 3, "name": "baz", "required": False, "type": 
"boolean"},
-                    ],
-                },
-                "current-schema-id": 0,
-                "schemas": [
-                    {
-                        "type": "struct",
-                        "schema-id": 0,
-                        "identifier-field-ids": [2],
-                        "fields": [
-                            {"id": 1, "name": "foo", "required": False, 
"type": "string"},
-                            {"id": 2, "name": "bar", "required": True, "type": 
"int"},
-                            {"id": 3, "name": "baz", "required": False, 
"type": "boolean"},
-                        ],
-                    }
-                ],
-                "partition-spec": [],
-                "default-spec-id": 0,
-                "last-partition-id": 999,
-                "default-sort-order-id": 0,
-                "sort-orders": [{"order-id": 0, "fields": []}],
-                "properties": {
-                    "write.delete.parquet.compression-codec": "zstd",
-                    "write.metadata.compression-codec": "gzip",
-                    "write.summary.partition-limit": "100",
-                    "write.parquet.compression-codec": "zstd",
-                },
-                "current-snapshot-id": -1,
-                "refs": {},
-                "snapshots": [],
-                "snapshot-log": [],
-                "metadata-log": [],
-            },
-            "config": {
-                "client.factory": 
"io.tabular.iceberg.catalog.TabularAwsClientFactory",
-                "region": "us-west-2",
-            },
-        },
+        json=example_table_metadata_no_snapshot_v1_rest_json,
         status_code=200,
         request_headers=TEST_HEADERS,
     )
@@ -607,47 +458,8 @@ def test_create_table_200(rest_mock: Mocker, 
table_schema_simple: Schema) -> Non
     )
     expected = Table(
         identifier=("rest", "fokko", "fokko2"),
-        metadata_location="s3://warehouse/database/table/metadata.json",
-        metadata=TableMetadataV1(
-            location="s3://warehouse/database/table",
-            table_uuid=UUID("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
-            last_updated_ms=1657810967051,
-            last_column_id=3,
-            schemas=[
-                Schema(
-                    NestedField(field_id=1, name="foo", 
field_type=StringType(), required=False),
-                    NestedField(field_id=2, name="bar", 
field_type=IntegerType(), required=True),
-                    NestedField(field_id=3, name="baz", 
field_type=BooleanType(), required=False),
-                    schema_id=0,
-                    identifier_field_ids=[2],
-                )
-            ],
-            current_schema_id=0,
-            default_spec_id=0,
-            last_partition_id=999,
-            properties={
-                "write.delete.parquet.compression-codec": "zstd",
-                "write.metadata.compression-codec": "gzip",
-                "write.summary.partition-limit": "100",
-                "write.parquet.compression-codec": "zstd",
-            },
-            current_snapshot_id=None,
-            snapshots=[],
-            snapshot_log=[],
-            metadata_log=[],
-            sort_orders=[SortOrder(order_id=0)],
-            default_sort_order_id=0,
-            refs={},
-            format_version=1,
-            schema_=Schema(
-                NestedField(field_id=1, name="foo", field_type=StringType(), 
required=False),
-                NestedField(field_id=2, name="bar", field_type=IntegerType(), 
required=True),
-                NestedField(field_id=3, name="baz", field_type=BooleanType(), 
required=False),
-                schema_id=0,
-                identifier_field_ids=[2],
-            ),
-            partition_spec=[],
-        ),
+        
metadata_location=example_table_metadata_no_snapshot_v1_rest_json["metadata-location"],
+        
metadata=TableMetadataV1(**example_table_metadata_no_snapshot_v1_rest_json["metadata"]),
         io=load_file_io(),
         catalog=catalog,
     )
@@ -682,62 +494,12 @@ def test_create_table_409(rest_mock: Mocker, 
table_schema_simple: Schema) -> Non
     assert "Table already exists" in str(e.value)
 
 
-def test_register_table_200(rest_mock: Mocker, table_schema_simple: Schema) -> 
None:
+def test_register_table_200(
+    rest_mock: Mocker, table_schema_simple: Schema, 
example_table_metadata_no_snapshot_v1_rest_json: Dict[str, Any]
+) -> None:
     rest_mock.post(
         f"{TEST_URI}v1/namespaces/default/register",
-        json={
-            "metadata-location": "s3://warehouse/database/table/metadata.json",
-            "metadata": {
-                "format-version": 1,
-                "table-uuid": "bf289591-dcc0-4234-ad4f-5c3eed811a29",
-                "location": "s3://warehouse/database/table",
-                "last-updated-ms": 1657810967051,
-                "last-column-id": 3,
-                "schema": {
-                    "type": "struct",
-                    "schema-id": 0,
-                    "identifier-field-ids": [2],
-                    "fields": [
-                        {"id": 1, "name": "foo", "required": False, "type": 
"string"},
-                        {"id": 2, "name": "bar", "required": True, "type": 
"int"},
-                        {"id": 3, "name": "baz", "required": False, "type": 
"boolean"},
-                    ],
-                },
-                "current-schema-id": 0,
-                "schemas": [
-                    {
-                        "type": "struct",
-                        "schema-id": 0,
-                        "identifier-field-ids": [2],
-                        "fields": [
-                            {"id": 1, "name": "foo", "required": False, 
"type": "string"},
-                            {"id": 2, "name": "bar", "required": True, "type": 
"int"},
-                            {"id": 3, "name": "baz", "required": False, 
"type": "boolean"},
-                        ],
-                    }
-                ],
-                "partition-spec": [],
-                "default-spec-id": 0,
-                "last-partition-id": 999,
-                "default-sort-order-id": 0,
-                "sort-orders": [{"order-id": 0, "fields": []}],
-                "properties": {
-                    "write.delete.parquet.compression-codec": "zstd",
-                    "write.metadata.compression-codec": "gzip",
-                    "write.summary.partition-limit": "100",
-                    "write.parquet.compression-codec": "zstd",
-                },
-                "current-snapshot-id": -1,
-                "refs": {},
-                "snapshots": [],
-                "snapshot-log": [],
-                "metadata-log": [],
-            },
-            "config": {
-                "client.factory": 
"io.tabular.iceberg.catalog.TabularAwsClientFactory",
-                "region": "us-west-2",
-            },
-        },
+        json=example_table_metadata_no_snapshot_v1_rest_json,
         status_code=200,
         request_headers=TEST_HEADERS,
     )
@@ -747,47 +509,8 @@ def test_register_table_200(rest_mock: Mocker, 
table_schema_simple: Schema) -> N
     )
     expected = Table(
         identifier=("rest", "default", "registered_table"),
-        metadata_location="s3://warehouse/database/table/metadata.json",
-        metadata=TableMetadataV1(
-            location="s3://warehouse/database/table",
-            table_uuid=UUID("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
-            last_updated_ms=1657810967051,
-            last_column_id=3,
-            schemas=[
-                Schema(
-                    NestedField(field_id=1, name="foo", 
field_type=StringType(), required=False),
-                    NestedField(field_id=2, name="bar", 
field_type=IntegerType(), required=True),
-                    NestedField(field_id=3, name="baz", 
field_type=BooleanType(), required=False),
-                    schema_id=0,
-                    identifier_field_ids=[2],
-                )
-            ],
-            current_schema_id=0,
-            default_spec_id=0,
-            last_partition_id=999,
-            properties={
-                "write.delete.parquet.compression-codec": "zstd",
-                "write.metadata.compression-codec": "gzip",
-                "write.summary.partition-limit": "100",
-                "write.parquet.compression-codec": "zstd",
-            },
-            current_snapshot_id=None,
-            snapshots=[],
-            snapshot_log=[],
-            metadata_log=[],
-            sort_orders=[SortOrder(order_id=0)],
-            default_sort_order_id=0,
-            refs={},
-            format_version=1,
-            schema_=Schema(
-                NestedField(field_id=1, name="foo", field_type=StringType(), 
required=False),
-                NestedField(field_id=2, name="bar", field_type=IntegerType(), 
required=True),
-                NestedField(field_id=3, name="baz", field_type=BooleanType(), 
required=False),
-                schema_id=0,
-                identifier_field_ids=[2],
-            ),
-            partition_spec=[],
-        ),
+        
metadata_location=example_table_metadata_no_snapshot_v1_rest_json["metadata-location"],
+        
metadata=TableMetadataV1(**example_table_metadata_no_snapshot_v1_rest_json["metadata"]),
         io=load_file_io(),
         catalog=catalog,
     )
@@ -839,6 +562,97 @@ def test_delete_table_204(rest_mock: Mocker) -> None:
     RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).drop_table(("example", 
"fokko"))
 
 
+def test_delete_table_from_self_identifier_204(
+    rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: 
Dict[str, Any]
+) -> None:
+    rest_mock.get(
+        f"{TEST_URI}v1/namespaces/pdames/tables/table",
+        json=example_table_metadata_with_snapshot_v1_rest_json,
+        status_code=200,
+        request_headers=TEST_HEADERS,
+    )
+    catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
+    table = catalog.load_table(("pdames", "table"))
+    rest_mock.delete(
+        f"{TEST_URI}v1/namespaces/pdames/tables/table",
+        json={},
+        status_code=204,
+        request_headers=TEST_HEADERS,
+    )
+    catalog.drop_table(table.identifier)
+
+
+def test_rename_table_200(rest_mock: Mocker, 
example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any]) -> None:
+    rest_mock.post(
+        f"{TEST_URI}v1/tables/rename",
+        json={
+            "source": {"namespace": ("pdames",), "name": "source"},
+            "destination": {"namespace": ("pdames",), "name": "destination"},
+        },
+        status_code=200,
+        request_headers=TEST_HEADERS,
+    )
+    rest_mock.get(
+        f"{TEST_URI}v1/namespaces/pdames/tables/destination",
+        json=example_table_metadata_with_snapshot_v1_rest_json,
+        status_code=200,
+        request_headers=TEST_HEADERS,
+    )
+    catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
+    from_identifier = ("pdames", "source")
+    to_identifier = ("pdames", "destination")
+    actual = catalog.rename_table(from_identifier, to_identifier)
+    expected = Table(
+        identifier=("rest", "pdames", "destination"),
+        
metadata_location=example_table_metadata_with_snapshot_v1_rest_json["metadata-location"],
+        
metadata=TableMetadataV1(**example_table_metadata_with_snapshot_v1_rest_json["metadata"]),
+        io=load_file_io(),
+        catalog=catalog,
+    )
+    assert actual.metadata.model_dump() == expected.metadata.model_dump()
+    assert actual == expected
+
+
+def test_rename_table_from_self_identifier_200(
+    rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: 
Dict[str, Any]
+) -> None:
+    rest_mock.get(
+        f"{TEST_URI}v1/namespaces/pdames/tables/source",
+        json=example_table_metadata_with_snapshot_v1_rest_json,
+        status_code=200,
+        request_headers=TEST_HEADERS,
+    )
+    catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
+    from_identifier = ("pdames", "source")
+    to_identifier = ("pdames", "destination")
+    table = catalog.load_table(from_identifier)
+    rest_mock.post(
+        f"{TEST_URI}v1/tables/rename",
+        json={
+            "source": {"namespace": ("pdames",), "name": "source"},
+            "destination": {"namespace": ("pdames",), "name": "destination"},
+        },
+        status_code=200,
+        request_headers=TEST_HEADERS,
+    )
+    rest_mock.get(
+        f"{TEST_URI}v1/namespaces/pdames/tables/destination",
+        json=example_table_metadata_with_snapshot_v1_rest_json,
+        status_code=200,
+        request_headers=TEST_HEADERS,
+    )
+    actual = catalog.rename_table(table.identifier, to_identifier)
+    expected = Table(
+        identifier=("rest", "pdames", "destination"),
+        
metadata_location=example_table_metadata_with_snapshot_v1_rest_json["metadata-location"],
+        
metadata=TableMetadataV1(**example_table_metadata_with_snapshot_v1_rest_json["metadata"]),
+        io=load_file_io(),
+        catalog=catalog,
+    )
+    assert actual.metadata.model_dump() == expected.metadata.model_dump()
+    assert actual == expected
+
+
 def test_delete_table_404(rest_mock: Mocker) -> None:
     rest_mock.delete(
         f"{TEST_URI}v1/namespaces/example/tables/fokko",
diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py
index 4277845..56d2c16 100644
--- a/tests/catalog/test_sql.py
+++ b/tests/catalog/test_sql.py
@@ -188,6 +188,20 @@ def test_load_table(test_catalog: SqlCatalog, 
table_schema_nested: Schema, rando
     assert table.metadata == loaded_table.metadata
 
 
+def test_load_table_from_self_identifier(
+    test_catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: 
Identifier
+) -> None:
+    database_name, _table_name = random_identifier
+    test_catalog.create_namespace(database_name)
+    table = test_catalog.create_table(random_identifier, table_schema_nested)
+    intermediate = test_catalog.load_table(random_identifier)
+    assert intermediate.identifier == (test_catalog.name,) + random_identifier
+    loaded_table = test_catalog.load_table(intermediate.identifier)
+    assert table.identifier == loaded_table.identifier
+    assert table.metadata_location == loaded_table.metadata_location
+    assert table.metadata == loaded_table.metadata
+
+
 def test_drop_table(test_catalog: SqlCatalog, table_schema_nested: Schema, 
random_identifier: Identifier) -> None:
     database_name, _table_name = random_identifier
     test_catalog.create_namespace(database_name)
@@ -198,6 +212,20 @@ def test_drop_table(test_catalog: SqlCatalog, 
table_schema_nested: Schema, rando
         test_catalog.load_table(random_identifier)
 
 
+def test_drop_table_from_self_identifier(
+    test_catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: 
Identifier
+) -> None:
+    database_name, _table_name = random_identifier
+    test_catalog.create_namespace(database_name)
+    table = test_catalog.create_table(random_identifier, table_schema_nested)
+    assert table.identifier == (test_catalog.name,) + random_identifier
+    test_catalog.drop_table(table.identifier)
+    with pytest.raises(NoSuchTableError):
+        test_catalog.load_table(table.identifier)
+    with pytest.raises(NoSuchTableError):
+        test_catalog.load_table(random_identifier)
+
+
 def test_drop_table_that_does_not_exist(test_catalog: SqlCatalog, 
random_identifier: Identifier) -> None:
     with pytest.raises(NoSuchTableError):
         test_catalog.drop_table(random_identifier)
@@ -220,6 +248,25 @@ def test_rename_table(
         test_catalog.load_table(random_identifier)
 
 
+def test_rename_table_from_self_identifier(
+    test_catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: 
Identifier, another_random_identifier: Identifier
+) -> None:
+    from_database_name, _from_table_name = random_identifier
+    to_database_name, _to_table_name = another_random_identifier
+    test_catalog.create_namespace(from_database_name)
+    test_catalog.create_namespace(to_database_name)
+    table = test_catalog.create_table(random_identifier, table_schema_nested)
+    assert table.identifier == (test_catalog.name,) + random_identifier
+    test_catalog.rename_table(table.identifier, another_random_identifier)
+    new_table = test_catalog.load_table(another_random_identifier)
+    assert new_table.identifier == (test_catalog.name,) + 
another_random_identifier
+    assert new_table.metadata_location == table.metadata_location
+    with pytest.raises(NoSuchTableError):
+        test_catalog.load_table(table.identifier)
+    with pytest.raises(NoSuchTableError):
+        test_catalog.load_table(random_identifier)
+
+
 def test_rename_table_to_existing_one(
     test_catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: 
Identifier, another_random_identifier: Identifier
 ) -> None:
diff --git a/tests/conftest.py b/tests/conftest.py
index 4bd1921..72a7ad0 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -354,6 +354,133 @@ def all_avro_types() -> Dict[str, Any]:
     }
 
 
+EXAMPLE_TABLE_METADATA_WITH_SNAPSHOT_V1 = {
+    "format-version": 1,
+    "table-uuid": "b55d9dda-6561-423a-8bfc-787980ce421f",
+    "location": "s3://warehouse/database/table",
+    "last-updated-ms": 1646787054459,
+    "last-column-id": 2,
+    "schema": {
+        "type": "struct",
+        "schema-id": 0,
+        "fields": [
+            {"id": 1, "name": "id", "required": False, "type": "int"},
+            {"id": 2, "name": "data", "required": False, "type": "string"},
+        ],
+    },
+    "current-schema-id": 0,
+    "schemas": [
+        {
+            "type": "struct",
+            "schema-id": 0,
+            "fields": [
+                {"id": 1, "name": "id", "required": False, "type": "int"},
+                {"id": 2, "name": "data", "required": False, "type": "string"},
+            ],
+        }
+    ],
+    "partition-spec": [],
+    "default-spec-id": 0,
+    "partition-specs": [{"spec-id": 0, "fields": []}],
+    "last-partition-id": 999,
+    "default-sort-order-id": 0,
+    "sort-orders": [{"order-id": 0, "fields": []}],
+    "properties": {
+        "owner": "bryan",
+        "write.metadata.compression-codec": "gzip",
+    },
+    "current-snapshot-id": 3497810964824022504,
+    "refs": {"main": {"snapshot-id": 3497810964824022504, "type": "branch"}},
+    "snapshots": [
+        {
+            "snapshot-id": 3497810964824022504,
+            "timestamp-ms": 1646787054459,
+            "summary": {
+                "operation": "append",
+                "spark.app.id": "local-1646787004168",
+                "added-data-files": "1",
+                "added-records": "1",
+                "added-files-size": "697",
+                "changed-partition-count": "1",
+                "total-records": "1",
+                "total-files-size": "697",
+                "total-data-files": "1",
+                "total-delete-files": "0",
+                "total-position-deletes": "0",
+                "total-equality-deletes": "0",
+            },
+            "manifest-list": 
"s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro",
+            "schema-id": 0,
+        }
+    ],
+    "snapshot-log": [{"timestamp-ms": 1646787054459, "snapshot-id": 
3497810964824022504}],
+    "metadata-log": [
+        {
+            "timestamp-ms": 1646787031514,
+            "metadata-file": 
"s3://warehouse/database/table/metadata/00000-88484a1c-00e5-4a07-a787-c0e7aeffa805.gz.metadata.json",
+        }
+    ],
+}
+
+
[email protected]
+def example_table_metadata_with_snapshot_v1() -> Dict[str, Any]:
+    return EXAMPLE_TABLE_METADATA_WITH_SNAPSHOT_V1
+
+
+EXAMPLE_TABLE_METADATA_NO_SNAPSHOT_V1 = {
+    "format-version": 1,
+    "table-uuid": "bf289591-dcc0-4234-ad4f-5c3eed811a29",
+    "location": "s3://warehouse/database/table",
+    "last-updated-ms": 1657810967051,
+    "last-column-id": 3,
+    "schema": {
+        "type": "struct",
+        "schema-id": 0,
+        "identifier-field-ids": [2],
+        "fields": [
+            {"id": 1, "name": "foo", "required": False, "type": "string"},
+            {"id": 2, "name": "bar", "required": True, "type": "int"},
+            {"id": 3, "name": "baz", "required": False, "type": "boolean"},
+        ],
+    },
+    "current-schema-id": 0,
+    "schemas": [
+        {
+            "type": "struct",
+            "schema-id": 0,
+            "identifier-field-ids": [2],
+            "fields": [
+                {"id": 1, "name": "foo", "required": False, "type": "string"},
+                {"id": 2, "name": "bar", "required": True, "type": "int"},
+                {"id": 3, "name": "baz", "required": False, "type": "boolean"},
+            ],
+        }
+    ],
+    "partition-spec": [],
+    "default-spec-id": 0,
+    "last-partition-id": 999,
+    "default-sort-order-id": 0,
+    "sort-orders": [{"order-id": 0, "fields": []}],
+    "properties": {
+        "write.delete.parquet.compression-codec": "zstd",
+        "write.metadata.compression-codec": "gzip",
+        "write.summary.partition-limit": "100",
+        "write.parquet.compression-codec": "zstd",
+    },
+    "current-snapshot-id": -1,
+    "refs": {},
+    "snapshots": [],
+    "snapshot-log": [],
+    "metadata-log": [],
+}
+
+
[email protected]
+def example_table_metadata_no_snapshot_v1() -> Dict[str, Any]:
+    return EXAMPLE_TABLE_METADATA_NO_SNAPSHOT_V1
+
+
 EXAMPLE_TABLE_METADATA_V2 = {
     "format-version": 2,
     "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",

Reply via email to