This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new a57cb07 Support catalog in identifier to locate existing tables (#150)
a57cb07 is described below
commit a57cb0745efba8dd115d1089d70f4a390cd3e99a
Author: Patrick Ames <[email protected]>
AuthorDate: Sat Nov 25 12:51:11 2023 -0800
Support catalog in identifier to locate existing tables (#150)
---
pyiceberg/catalog/__init__.py | 19 +-
pyiceberg/catalog/dynamodb.py | 11 +-
pyiceberg/catalog/glue.py | 9 +-
pyiceberg/catalog/hive.py | 9 +-
pyiceberg/catalog/rest.py | 33 +--
pyiceberg/catalog/sql.py | 9 +-
tests/catalog/test_base.py | 53 ++++-
tests/catalog/test_dynamodb.py | 65 ++++++
tests/catalog/test_glue.py | 62 ++++++
tests/catalog/test_hive.py | 165 +++++++++++++-
tests/catalog/test_rest.py | 496 +++++++++++++----------------------------
tests/catalog/test_sql.py | 47 ++++
tests/conftest.py | 127 +++++++++++
13 files changed, 730 insertions(+), 375 deletions(-)
diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py
index 2577a97..c83fd1c 100644
--- a/pyiceberg/catalog/__init__.py
+++ b/pyiceberg/catalog/__init__.py
@@ -536,6 +536,20 @@ class Catalog(ABC):
return tuple_identifier[0], tuple_identifier[1]
+ def identifier_to_tuple_without_catalog(self, identifier: Union[str,
Identifier]) -> Identifier:
+ """Convert an identifier to a tuple and drop this catalog's name from
the first element.
+
+ Args:
+ identifier (str | Identifier): Table identifier.
+
+ Returns:
+ Identifier: a tuple of strings with this catalog's name removed
+ """
+ identifier_tuple = Catalog.identifier_to_tuple(identifier)
+ if len(identifier_tuple) >= 3 and identifier_tuple[0] == self.name:
+ identifier_tuple = identifier_tuple[1:]
+ return identifier_tuple
+
def purge_table(self, identifier: Union[str, Identifier]) -> None:
"""Drop a table and purge all data and metadata files.
@@ -547,8 +561,9 @@ class Catalog(ABC):
Raises:
NoSuchTableError: If a table with the name does not exist, or the
identifier is invalid.
"""
- table = self.load_table(identifier)
- self.drop_table(identifier)
+ identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
+ table = self.load_table(identifier_tuple)
+ self.drop_table(identifier_tuple)
io = load_file_io(self.properties, table.metadata_location)
metadata = table.metadata
manifest_lists_to_delete = set()
diff --git a/pyiceberg/catalog/dynamodb.py b/pyiceberg/catalog/dynamodb.py
index 848ec03..3eee95d 100644
--- a/pyiceberg/catalog/dynamodb.py
+++ b/pyiceberg/catalog/dynamodb.py
@@ -213,7 +213,8 @@ class DynamoDbCatalog(Catalog):
Raises:
NoSuchTableError: If a table with the name does not exist, or the
identifier is invalid.
"""
- database_name, table_name =
self.identifier_to_database_and_table(identifier, NoSuchTableError)
+ identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
+ database_name, table_name =
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
dynamo_table_item =
self._get_iceberg_table_item(database_name=database_name, table_name=table_name)
return
self._convert_dynamo_table_item_to_iceberg_table(dynamo_table_item=dynamo_table_item)
@@ -226,7 +227,8 @@ class DynamoDbCatalog(Catalog):
Raises:
NoSuchTableError: If a table with the name does not exist, or the
identifier is invalid.
"""
- database_name, table_name =
self.identifier_to_database_and_table(identifier, NoSuchTableError)
+ identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
+ database_name, table_name =
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
try:
self._delete_dynamo_item(
@@ -256,7 +258,8 @@ class DynamoDbCatalog(Catalog):
NoSuchPropertyException: When from table miss some required
properties.
NoSuchNamespaceError: When the destination namespace doesn't exist.
"""
- from_database_name, from_table_name =
self.identifier_to_database_and_table(from_identifier, NoSuchTableError)
+ from_identifier_tuple =
self.identifier_to_tuple_without_catalog(from_identifier)
+ from_database_name, from_table_name =
self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
to_database_name, to_table_name =
self.identifier_to_database_and_table(to_identifier)
from_table_item =
self._get_iceberg_table_item(database_name=from_database_name,
table_name=from_table_name)
@@ -287,7 +290,7 @@ class DynamoDbCatalog(Catalog):
raise TableAlreadyExistsError(f"Table
{to_database_name}.{to_table_name} already exists") from e
try:
- self.drop_table(from_identifier)
+ self.drop_table(from_identifier_tuple)
except (NoSuchTableError, GenericDynamoDbError) as e:
log_message = f"Failed to drop old table
{from_database_name}.{from_table_name}. "
diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py
index e068363..723de2f 100644
--- a/pyiceberg/catalog/glue.py
+++ b/pyiceberg/catalog/glue.py
@@ -265,7 +265,8 @@ class GlueCatalog(Catalog):
Raises:
NoSuchTableError: If a table with the name does not exist, or the
identifier is invalid.
"""
- database_name, table_name =
self.identifier_to_database_and_table(identifier, NoSuchTableError)
+ identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
+ database_name, table_name =
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
try:
load_table_response =
self.glue.get_table(DatabaseName=database_name, Name=table_name)
except self.glue.exceptions.EntityNotFoundException as e:
@@ -282,7 +283,8 @@ class GlueCatalog(Catalog):
Raises:
NoSuchTableError: If a table with the name does not exist, or the
identifier is invalid.
"""
- database_name, table_name =
self.identifier_to_database_and_table(identifier, NoSuchTableError)
+ identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
+ database_name, table_name =
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
try:
self.glue.delete_table(DatabaseName=database_name, Name=table_name)
except self.glue.exceptions.EntityNotFoundException as e:
@@ -307,7 +309,8 @@ class GlueCatalog(Catalog):
NoSuchPropertyException: When from table miss some required
properties.
NoSuchNamespaceError: When the destination namespace doesn't exist.
"""
- from_database_name, from_table_name =
self.identifier_to_database_and_table(from_identifier, NoSuchTableError)
+ from_identifier_tuple =
self.identifier_to_tuple_without_catalog(from_identifier)
+ from_database_name, from_table_name =
self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
to_database_name, to_table_name =
self.identifier_to_database_and_table(to_identifier)
try:
get_table_response =
self.glue.get_table(DatabaseName=from_database_name, Name=from_table_name)
diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py
index 21f1714..ffc9c53 100644
--- a/pyiceberg/catalog/hive.py
+++ b/pyiceberg/catalog/hive.py
@@ -347,7 +347,8 @@ class HiveCatalog(Catalog):
Raises:
NoSuchTableError: If a table with the name does not exist, or the
identifier is invalid.
"""
- database_name, table_name =
self.identifier_to_database_and_table(identifier, NoSuchTableError)
+ identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
+ database_name, table_name =
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
try:
with self._client as open_client:
hive_table = open_client.get_table(dbname=database_name,
tbl_name=table_name)
@@ -366,7 +367,8 @@ class HiveCatalog(Catalog):
Raises:
NoSuchTableError: If a table with the name does not exist, or the
identifier is invalid.
"""
- database_name, table_name =
self.identifier_to_database_and_table(identifier, NoSuchTableError)
+ identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
+ database_name, table_name =
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
try:
with self._client as open_client:
open_client.drop_table(dbname=database_name, name=table_name,
deleteData=False)
@@ -393,7 +395,8 @@ class HiveCatalog(Catalog):
NoSuchTableError: When a table with the name does not exist.
NoSuchNamespaceError: When the destination namespace doesn't exist.
"""
- from_database_name, from_table_name =
self.identifier_to_database_and_table(from_identifier, NoSuchTableError)
+ from_identifier_tuple =
self.identifier_to_tuple_without_catalog(from_identifier)
+ from_database_name, from_table_name =
self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
to_database_name, to_table_name =
self.identifier_to_database_and_table(to_identifier)
try:
with self._client as open_client:
diff --git a/pyiceberg/catalog/rest.py b/pyiceberg/catalog/rest.py
index 77025b5..3dbb5b7 100644
--- a/pyiceberg/catalog/rest.py
+++ b/pyiceberg/catalog/rest.py
@@ -302,19 +302,20 @@ class RestCatalog(Catalog):
# Update URI based on overrides
self.uri = config[URI]
- def _split_identifier_for_path(self, identifier: Union[str, Identifier,
TableIdentifier]) -> Properties:
- if isinstance(identifier, TableIdentifier):
- return {"namespace":
NAMESPACE_SEPARATOR.join(identifier.namespace.root[1:]), "table":
identifier.name}
-
+ def _identifier_to_validated_tuple(self, identifier: Union[str,
Identifier]) -> Identifier:
identifier_tuple = self.identifier_to_tuple(identifier)
if len(identifier_tuple) <= 1:
raise NoSuchTableError(f"Missing namespace or invalid identifier:
{'.'.join(identifier_tuple)}")
+ return identifier_tuple
+
+ def _split_identifier_for_path(self, identifier: Union[str, Identifier,
TableIdentifier]) -> Properties:
+ if isinstance(identifier, TableIdentifier):
+ return {"namespace":
NAMESPACE_SEPARATOR.join(identifier.namespace.root[1:]), "table":
identifier.name}
+ identifier_tuple = self._identifier_to_validated_tuple(identifier)
return {"namespace": NAMESPACE_SEPARATOR.join(identifier_tuple[:-1]),
"table": identifier_tuple[-1]}
def _split_identifier_for_json(self, identifier: Union[str, Identifier])
-> Dict[str, Union[Identifier, str]]:
- identifier_tuple = self.identifier_to_tuple(identifier)
- if len(identifier_tuple) <= 1:
- raise NoSuchTableError(f"Missing namespace or invalid identifier:
{identifier_tuple}")
+ identifier_tuple = self._identifier_to_validated_tuple(identifier)
return {"namespace": identifier_tuple[:-1], "name":
identifier_tuple[-1]}
def _handle_non_200_response(self, exc: HTTPError, error_handler:
Dict[int, Type[Exception]]) -> None:
@@ -499,12 +500,10 @@ class RestCatalog(Catalog):
return [(*table.namespace, table.name) for table in
ListTablesResponse(**response.json()).identifiers]
def load_table(self, identifier: Union[str, Identifier]) -> Table:
- identifier_tuple = self.identifier_to_tuple(identifier)
-
- if len(identifier_tuple) <= 1:
- raise NoSuchTableError(f"Missing namespace or invalid identifier:
{identifier}")
-
- response = self._session.get(self.url(Endpoints.load_table,
prefixed=True, **self._split_identifier_for_path(identifier)))
+ identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
+ response = self._session.get(
+ self.url(Endpoints.load_table, prefixed=True,
**self._split_identifier_for_path(identifier_tuple))
+ )
try:
response.raise_for_status()
except HTTPError as exc:
@@ -514,8 +513,11 @@ class RestCatalog(Catalog):
return self._response_to_table(identifier_tuple, table_response)
def drop_table(self, identifier: Union[str, Identifier], purge_requested:
bool = False) -> None:
+ identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
response = self._session.delete(
- self.url(Endpoints.drop_table, prefixed=True,
purge=purge_requested, **self._split_identifier_for_path(identifier)),
+ self.url(
+ Endpoints.drop_table, prefixed=True, purge=purge_requested,
**self._split_identifier_for_path(identifier_tuple)
+ ),
)
try:
response.raise_for_status()
@@ -526,8 +528,9 @@ class RestCatalog(Catalog):
self.drop_table(identifier=identifier, purge_requested=True)
def rename_table(self, from_identifier: Union[str, Identifier],
to_identifier: Union[str, Identifier]) -> Table:
+ from_identifier_tuple =
self.identifier_to_tuple_without_catalog(from_identifier)
payload = {
- "source": self._split_identifier_for_json(from_identifier),
+ "source": self._split_identifier_for_json(from_identifier_tuple),
"destination": self._split_identifier_for_json(to_identifier),
}
response = self._session.post(self.url(Endpoints.rename_table),
json=payload)
diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py
index bca0fe4..b0c01eb 100644
--- a/pyiceberg/catalog/sql.py
+++ b/pyiceberg/catalog/sql.py
@@ -231,7 +231,8 @@ class SqlCatalog(Catalog):
Raises:
NoSuchTableError: If a table with the name does not exist.
"""
- database_name, table_name =
self.identifier_to_database_and_table(identifier, NoSuchTableError)
+ identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
+ database_name, table_name =
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
with Session(self.engine) as session:
stmt = select(IcebergTables).where(
IcebergTables.catalog_name == self.name,
@@ -252,7 +253,8 @@ class SqlCatalog(Catalog):
Raises:
NoSuchTableError: If a table with the name does not exist.
"""
- database_name, table_name =
self.identifier_to_database_and_table(identifier, NoSuchTableError)
+ identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
+ database_name, table_name =
self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
with Session(self.engine) as session:
res = session.execute(
delete(IcebergTables).where(
@@ -280,7 +282,8 @@ class SqlCatalog(Catalog):
TableAlreadyExistsError: If a table with the new name already
exist.
NoSuchNamespaceError: If the target namespace does not exist.
"""
- from_database_name, from_table_name =
self.identifier_to_database_and_table(from_identifier, NoSuchTableError)
+ from_identifier_tuple =
self.identifier_to_tuple_without_catalog(from_identifier)
+ from_database_name, from_table_name =
self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
to_database_name, to_table_name =
self.identifier_to_database_and_table(to_identifier)
if not self._namespace_exists(to_database_name):
raise NoSuchNamespaceError(f"Namespace does not exist:
{to_database_name}")
diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py
index 1078dd1..c7cd1c4 100644
--- a/tests/catalog/test_base.py
+++ b/tests/catalog/test_base.py
@@ -149,14 +149,14 @@ class InMemoryCatalog(Catalog):
)
def load_table(self, identifier: Union[str, Identifier]) -> Table:
- identifier = Catalog.identifier_to_tuple(identifier)
+ identifier = self.identifier_to_tuple_without_catalog(identifier)
try:
return self.__tables[identifier]
except KeyError as error:
raise NoSuchTableError(f"Table does not exist: {identifier}") from
error
def drop_table(self, identifier: Union[str, Identifier]) -> None:
- identifier = Catalog.identifier_to_tuple(identifier)
+ identifier = self.identifier_to_tuple_without_catalog(identifier)
try:
self.__tables.pop(identifier)
except KeyError as error:
@@ -166,7 +166,7 @@ class InMemoryCatalog(Catalog):
self.drop_table(identifier)
def rename_table(self, from_identifier: Union[str, Identifier],
to_identifier: Union[str, Identifier]) -> Table:
- from_identifier = Catalog.identifier_to_tuple(from_identifier)
+ from_identifier =
self.identifier_to_tuple_without_catalog(from_identifier)
try:
table = self.__tables.pop(from_identifier)
except KeyError as error:
@@ -352,6 +352,16 @@ def test_load_table(catalog: InMemoryCatalog) -> None:
assert table == given_table
+def test_load_table_from_self_identifier(catalog: InMemoryCatalog) -> None:
+ # Given
+ given_table = given_catalog_has_a_table(catalog)
+ # When
+ intermediate = catalog.load_table(TEST_TABLE_IDENTIFIER)
+ table = catalog.load_table(intermediate.identifier)
+ # Then
+ assert table == given_table
+
+
def test_table_raises_error_on_table_not_found(catalog: InMemoryCatalog) ->
None:
with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
catalog.load_table(TEST_TABLE_IDENTIFIER)
@@ -367,6 +377,18 @@ def test_drop_table(catalog: InMemoryCatalog) -> None:
catalog.load_table(TEST_TABLE_IDENTIFIER)
+def test_drop_table_from_self_identifier(catalog: InMemoryCatalog) -> None:
+ # Given
+ table = given_catalog_has_a_table(catalog)
+ # When
+ catalog.drop_table(table.identifier)
+ # Then
+ with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
+ catalog.load_table(table.identifier)
+ with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
+ catalog.load_table(TEST_TABLE_IDENTIFIER)
+
+
def test_drop_table_that_does_not_exist_raise_error(catalog: InMemoryCatalog)
-> None:
with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
catalog.load_table(TEST_TABLE_IDENTIFIER)
@@ -405,6 +427,31 @@ def test_rename_table(catalog: InMemoryCatalog) -> None:
catalog.load_table(TEST_TABLE_IDENTIFIER)
+def test_rename_table_from_self_identifier(catalog: InMemoryCatalog) -> None:
+ # Given
+ table = given_catalog_has_a_table(catalog)
+
+ # When
+ new_table_name = "new.namespace.new_table"
+ new_table = catalog.rename_table(table.identifier, new_table_name)
+
+ # Then
+ assert new_table.identifier == Catalog.identifier_to_tuple(new_table_name)
+
+ # And
+ new_table = catalog.load_table(new_table.identifier)
+ assert new_table.identifier == Catalog.identifier_to_tuple(new_table_name)
+
+ # And
+ assert ("new", "namespace") in catalog.list_namespaces()
+
+ # And
+ with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
+ catalog.load_table(table.identifier)
+ with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
+ catalog.load_table(TEST_TABLE_IDENTIFIER)
+
+
def test_create_namespace(catalog: InMemoryCatalog) -> None:
# When
catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES)
diff --git a/tests/catalog/test_dynamodb.py b/tests/catalog/test_dynamodb.py
index 582cb03..f03d1d9 100644
--- a/tests/catalog/test_dynamodb.py
+++ b/tests/catalog/test_dynamodb.py
@@ -175,6 +175,23 @@ def test_load_table(
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+@mock_dynamodb
+def test_load_table_from_self_identifier(
+ _bucket_initialize: None, _patch_aiobotocore: None, table_schema_nested:
Schema, database_name: str, table_name: str
+) -> None:
+ catalog_name = "test_ddb_catalog"
+ identifier = (database_name, table_name)
+ test_catalog = DynamoDbCatalog(
+ catalog_name, **{"warehouse": f"s3://{BUCKET_NAME}", "py-io-impl":
"pyiceberg.io.fsspec.FsspecFileIO"}
+ )
+ test_catalog.create_namespace(namespace=database_name)
+ test_catalog.create_table(identifier, table_schema_nested)
+ intermediate = test_catalog.load_table(identifier)
+ table = test_catalog.load_table(intermediate.identifier)
+ assert table.identifier == (catalog_name,) + identifier
+ assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+
+
@mock_dynamodb
def test_load_non_exist_table(_bucket_initialize: None, _patch_aiobotocore:
None, database_name: str, table_name: str) -> None:
identifier = (database_name, table_name)
@@ -203,6 +220,27 @@ def test_drop_table(
test_catalog.load_table(identifier)
+@mock_dynamodb
+def test_drop_table_from_self_identifier(
+ _bucket_initialize: None, _patch_aiobotocore: None, table_schema_nested:
Schema, database_name: str, table_name: str
+) -> None:
+ catalog_name = "test_ddb_catalog"
+ identifier = (database_name, table_name)
+ test_catalog = DynamoDbCatalog(
+ catalog_name, **{"warehouse": f"s3://{BUCKET_NAME}", "py-io-impl":
"pyiceberg.io.fsspec.FsspecFileIO"}
+ )
+ test_catalog.create_namespace(namespace=database_name)
+ test_catalog.create_table(identifier, table_schema_nested)
+ table = test_catalog.load_table(identifier)
+ assert table.identifier == (catalog_name,) + identifier
+ assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+ test_catalog.drop_table(table.identifier)
+ with pytest.raises(NoSuchTableError):
+ test_catalog.load_table(identifier)
+ with pytest.raises(NoSuchTableError):
+ test_catalog.load_table(table.identifier)
+
+
@mock_dynamodb
def test_drop_non_exist_table(_bucket_initialize: None, _patch_aiobotocore:
None, database_name: str, table_name: str) -> None:
identifier = (database_name, table_name)
@@ -236,6 +274,33 @@ def test_rename_table(
test_catalog.load_table(identifier)
+@mock_dynamodb
+def test_rename_table_from_self_identifier(
+ _bucket_initialize: None, _patch_aiobotocore: None, table_schema_nested:
Schema, database_name: str, table_name: str
+) -> None:
+ catalog_name = "test_ddb_catalog"
+ new_table_name = f"{table_name}_new"
+ identifier = (database_name, table_name)
+ new_identifier = (database_name, new_table_name)
+ test_catalog = DynamoDbCatalog(
+ catalog_name, **{"warehouse": f"s3://{BUCKET_NAME}", "py-io-impl":
"pyiceberg.io.fsspec.FsspecFileIO"}
+ )
+ test_catalog.create_namespace(namespace=database_name)
+ table = test_catalog.create_table(identifier, table_schema_nested)
+ assert table.identifier == (catalog_name,) + identifier
+ assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+ test_catalog.rename_table(table.identifier, new_identifier)
+ new_table = test_catalog.load_table(new_identifier)
+ assert new_table.identifier == (catalog_name,) + new_identifier
+ # the metadata_location should not change
+ assert new_table.metadata_location == table.metadata_location
+ # old table should be dropped
+ with pytest.raises(NoSuchTableError):
+ test_catalog.load_table(identifier)
+ with pytest.raises(NoSuchTableError):
+ test_catalog.load_table(table.identifier)
+
+
@mock_dynamodb
def test_fail_on_rename_table_with_missing_required_params(
_bucket_initialize: None, _patch_aiobotocore: None, database_name: str,
table_name: str
diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py
index 1d7027a..f182bf1 100644
--- a/tests/catalog/test_glue.py
+++ b/tests/catalog/test_glue.py
@@ -153,6 +153,22 @@ def test_load_table(
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+@mock_glue
+def test_load_table_from_self_identifier(
+ _bucket_initialize: None, _patch_aiobotocore: None, table_schema_nested:
Schema, database_name: str, table_name: str
+) -> None:
+ catalog_name = "glue"
+ identifier = (database_name, table_name)
+ test_catalog = GlueCatalog(
+ catalog_name, **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO",
"warehouse": f"s3://{BUCKET_NAME}/"}
+ )
+ test_catalog.create_namespace(namespace=database_name)
+ intermediate = test_catalog.create_table(identifier, table_schema_nested)
+ table = test_catalog.load_table(intermediate.identifier)
+ assert table.identifier == (catalog_name,) + identifier
+ assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+
+
@mock_glue
def test_load_non_exist_table(_bucket_initialize: None, _patch_aiobotocore:
None, database_name: str, table_name: str) -> None:
identifier = (database_name, table_name)
@@ -181,6 +197,27 @@ def test_drop_table(
test_catalog.load_table(identifier)
+@mock_glue
+def test_drop_table_from_self_identifier(
+ _bucket_initialize: None, _patch_aiobotocore: None, table_schema_nested:
Schema, database_name: str, table_name: str
+) -> None:
+ catalog_name = "glue"
+ identifier = (database_name, table_name)
+ test_catalog = GlueCatalog(
+ catalog_name, **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO",
"warehouse": f"s3://{BUCKET_NAME}/"}
+ )
+ test_catalog.create_namespace(namespace=database_name)
+ test_catalog.create_table(identifier, table_schema_nested)
+ table = test_catalog.load_table(identifier)
+ assert table.identifier == (catalog_name,) + identifier
+ assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+ test_catalog.drop_table(table.identifier)
+ with pytest.raises(NoSuchTableError):
+ test_catalog.load_table(identifier)
+ with pytest.raises(NoSuchTableError):
+ test_catalog.load_table(table.identifier)
+
+
@mock_glue
def test_drop_non_exist_table(_bucket_initialize: None, _patch_aiobotocore:
None, database_name: str, table_name: str) -> None:
identifier = (database_name, table_name)
@@ -212,6 +249,31 @@ def test_rename_table(
test_catalog.load_table(identifier)
+@mock_glue
+def test_rename_table_from_self_identifier(
+ _bucket_initialize: None, _patch_aiobotocore: None, table_schema_nested:
Schema, database_name: str, table_name: str
+) -> None:
+ catalog_name = "glue"
+ new_table_name = f"{table_name}_new"
+ identifier = (database_name, table_name)
+ new_identifier = (database_name, new_table_name)
+ test_catalog = GlueCatalog("glue", **{"py-io-impl":
"pyiceberg.io.fsspec.FsspecFileIO", "warehouse": f"s3://{BUCKET_NAME}/"})
+ test_catalog.create_namespace(namespace=database_name)
+ table = test_catalog.create_table(identifier, table_schema_nested)
+ assert table.identifier == (catalog_name,) + identifier
+ assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
+ test_catalog.rename_table(table.identifier, new_identifier)
+ new_table = test_catalog.load_table(new_identifier)
+ assert new_table.identifier == (catalog_name,) + new_identifier
+ # the metadata_location should not change
+ assert new_table.metadata_location == table.metadata_location
+ # old table should be dropped
+ with pytest.raises(NoSuchTableError):
+ test_catalog.load_table(identifier)
+ with pytest.raises(NoSuchTableError):
+ test_catalog.load_table(table.identifier)
+
+
@mock_glue
def test_rename_table_no_params(_glue, _bucket_initialize: None,
_patch_aiobotocore: None, database_name: str, table_name: str) -> None: #
type: ignore
new_database_name = f"{database_name}_new"
diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py
index c280146..5433678 100644
--- a/tests/catalog/test_hive.py
+++ b/tests/catalog/test_hive.py
@@ -15,8 +15,9 @@
# specific language governing permissions and limitations
# under the License.
# pylint: disable=protected-access,redefined-outer-name
+import copy
import uuid
-from unittest.mock import MagicMock, patch
+from unittest.mock import MagicMock, call, patch
import pytest
from hive_metastore.ttypes import (
@@ -394,6 +395,155 @@ def test_load_table(hive_table: HiveTable) -> None:
assert expected == table.metadata
+def test_load_table_from_self_identifier(hive_table: HiveTable) -> None:
+ catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)
+
+ catalog._client = MagicMock()
+ catalog._client.__enter__().get_table.return_value = hive_table
+ intermediate = catalog.load_table(("default", "new_tabl2e"))
+ table = catalog.load_table(intermediate.identifier)
+
+ catalog._client.__enter__().get_table.assert_called_with(dbname="default",
tbl_name="new_tabl2e")
+
+ expected = TableMetadataV2(
+ location="s3://bucket/test/location",
+ table_uuid=uuid.UUID("9c12d441-03fe-4693-9a96-a0705ddf69c1"),
+ last_updated_ms=1602638573590,
+ last_column_id=3,
+ schemas=[
+ Schema(
+ NestedField(field_id=1, name="x", field_type=LongType(),
required=True),
+ schema_id=0,
+ identifier_field_ids=[],
+ ),
+ Schema(
+ NestedField(field_id=1, name="x", field_type=LongType(),
required=True),
+ NestedField(field_id=2, name="y", field_type=LongType(),
required=True, doc="comment"),
+ NestedField(field_id=3, name="z", field_type=LongType(),
required=True),
+ schema_id=1,
+ identifier_field_ids=[1, 2],
+ ),
+ ],
+ current_schema_id=1,
+ partition_specs=[
+ PartitionSpec(PartitionField(source_id=1, field_id=1000,
transform=IdentityTransform(), name="x"), spec_id=0)
+ ],
+ default_spec_id=0,
+ last_partition_id=1000,
+ properties={"read.split.target.size": "134217728"},
+ current_snapshot_id=3055729675574597004,
+ snapshots=[
+ Snapshot(
+ snapshot_id=3051729675574597004,
+ parent_snapshot_id=None,
+ sequence_number=0,
+ timestamp_ms=1515100955770,
+ manifest_list="s3://a/b/1.avro",
+ summary=Summary(operation=Operation.APPEND),
+ schema_id=None,
+ ),
+ Snapshot(
+ snapshot_id=3055729675574597004,
+ parent_snapshot_id=3051729675574597004,
+ sequence_number=1,
+ timestamp_ms=1555100955770,
+ manifest_list="s3://a/b/2.avro",
+ summary=Summary(operation=Operation.APPEND),
+ schema_id=1,
+ ),
+ ],
+ snapshot_log=[
+ SnapshotLogEntry(snapshot_id=3051729675574597004,
timestamp_ms=1515100955770),
+ SnapshotLogEntry(snapshot_id=3055729675574597004,
timestamp_ms=1555100955770),
+ ],
+
metadata_log=[MetadataLogEntry(metadata_file="s3://bucket/.../v1.json",
timestamp_ms=1515100)],
+ sort_orders=[
+ SortOrder(
+ SortField(
+ source_id=2, transform=IdentityTransform(),
direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST
+ ),
+ SortField(
+ source_id=3,
+ transform=BucketTransform(num_buckets=4),
+ direction=SortDirection.DESC,
+ null_order=NullOrder.NULLS_LAST,
+ ),
+ order_id=3,
+ )
+ ],
+ default_sort_order_id=3,
+ refs={
+ "test": SnapshotRef(
+ snapshot_id=3051729675574597004,
+ snapshot_ref_type=SnapshotRefType.TAG,
+ min_snapshots_to_keep=None,
+ max_snapshot_age_ms=None,
+ max_ref_age_ms=10000000,
+ ),
+ "main": SnapshotRef(
+ snapshot_id=3055729675574597004,
+ snapshot_ref_type=SnapshotRefType.BRANCH,
+ min_snapshots_to_keep=None,
+ max_snapshot_age_ms=None,
+ max_ref_age_ms=None,
+ ),
+ },
+ format_version=2,
+ last_sequence_number=34,
+ )
+
+ assert table.identifier == (HIVE_CATALOG_NAME, "default", "new_tabl2e")
+ assert expected == table.metadata
+
+
+def test_rename_table(hive_table: HiveTable) -> None:
+ catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)
+
+ renamed_table = copy.deepcopy(hive_table)
+ renamed_table.dbName = "default"
+ renamed_table.tableName = "new_tabl3e"
+
+ catalog._client = MagicMock()
+ catalog._client.__enter__().get_table.side_effect = [hive_table,
renamed_table]
+ catalog._client.__enter__().alter_table.return_value = None
+
+ from_identifier = ("default", "new_tabl2e")
+ to_identifier = ("default", "new_tabl3e")
+ table = catalog.rename_table(from_identifier, to_identifier)
+
+ assert table.identifier == ("hive",) + to_identifier
+
+ calls = [call(dbname="default", tbl_name="new_tabl2e"),
call(dbname="default", tbl_name="new_tabl3e")]
+ catalog._client.__enter__().get_table.assert_has_calls(calls)
+
catalog._client.__enter__().alter_table.assert_called_with(dbname="default",
tbl_name="new_tabl2e", new_tbl=renamed_table)
+
+
+def test_rename_table_from_self_identifier(hive_table: HiveTable) -> None:
+ catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)
+
+ catalog._client = MagicMock()
+ catalog._client.__enter__().get_table.return_value = hive_table
+
+ from_identifier = ("default", "new_tabl2e")
+ from_table = catalog.load_table(from_identifier)
+ catalog._client.__enter__().get_table.assert_called_with(dbname="default",
tbl_name="new_tabl2e")
+
+ renamed_table = copy.deepcopy(hive_table)
+ renamed_table.dbName = "default"
+ renamed_table.tableName = "new_tabl3e"
+
+ catalog._client.__enter__().get_table.side_effect = [hive_table,
renamed_table]
+ catalog._client.__enter__().alter_table.return_value = None
+ to_identifier = ("default", "new_tabl3e")
+ table = catalog.rename_table(from_table.identifier, to_identifier)
+
+ assert table.identifier == ("hive",) + to_identifier
+
+ calls = [call(dbname="default", tbl_name="new_tabl2e"),
call(dbname="default", tbl_name="new_tabl3e")]
+ catalog._client.__enter__().get_table.assert_has_calls(calls)
+
catalog._client.__enter__().alter_table.assert_called_with(dbname="default",
tbl_name="new_tabl2e", new_tbl=renamed_table)
+
+
def test_rename_table_from_does_not_exists() -> None:
catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)
@@ -489,6 +639,19 @@ def test_drop_table() -> None:
catalog._client.__enter__().drop_table.assert_called_with(dbname="default",
name="table", deleteData=False)
+def test_drop_table_from_self_identifier(hive_table: HiveTable) -> None:
+ catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)
+
+ catalog._client = MagicMock()
+ catalog._client.__enter__().get_table.return_value = hive_table
+ table = catalog.load_table(("default", "new_tabl2e"))
+
+ catalog._client.__enter__().get_all_databases.return_value =
["namespace1", "namespace2"]
+ catalog.drop_table(table.identifier)
+
+
catalog._client.__enter__().drop_table.assert_called_with(dbname="default",
name="new_tabl2e", deleteData=False)
+
+
def test_drop_table_does_not_exists() -> None:
catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)
diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py
index 43313c0..79cf25f 100644
--- a/tests/catalog/test_rest.py
+++ b/tests/catalog/test_rest.py
@@ -16,9 +16,8 @@
# under the License.
# pylint: disable=redefined-outer-name,unused-argument
import os
-from typing import cast
+from typing import Any, Dict, cast
from unittest import mock
-from uuid import UUID
import pytest
from requests_mock import Mocker
@@ -37,17 +36,9 @@ from pyiceberg.io import load_file_io
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table.metadata import TableMetadataV1
-from pyiceberg.table.refs import SnapshotRef, SnapshotRefType
-from pyiceberg.table.snapshots import Operation, Snapshot, Summary
from pyiceberg.table.sorting import SortField, SortOrder
from pyiceberg.transforms import IdentityTransform, TruncateTransform
from pyiceberg.typedef import RecursiveDict
-from pyiceberg.types import (
- BooleanType,
- IntegerType,
- NestedField,
- StringType,
-)
from pyiceberg.utils.config import Config
TEST_URI = "https://iceberg-test-catalog/"
@@ -64,6 +55,30 @@ OAUTH_TEST_HEADERS = {
}
[email protected]
+def
example_table_metadata_with_snapshot_v1_rest_json(example_table_metadata_with_snapshot_v1:
Dict[str, Any]) -> Dict[str, Any]:
+ return {
+ "metadata-location":
"s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
+ "metadata": example_table_metadata_with_snapshot_v1,
+ "config": {
+ "client.factory":
"io.tabular.iceberg.catalog.TabularAwsClientFactory",
+ "region": "us-west-2",
+ },
+ }
+
+
[email protected]
+def
example_table_metadata_no_snapshot_v1_rest_json(example_table_metadata_no_snapshot_v1:
Dict[str, Any]) -> Dict[str, Any]:
+ return {
+ "metadata-location": "s3://warehouse/database/table/metadata.json",
+ "metadata": example_table_metadata_no_snapshot_v1,
+ "config": {
+ "client.factory":
"io.tabular.iceberg.catalog.TabularAwsClientFactory",
+ "region": "us-west-2",
+ },
+ }
+
+
@pytest.fixture
def rest_mock(requests_mock: Mocker) -> Mocker:
"""Takes the default requests_mock and adds the config endpoint to it
@@ -339,77 +354,10 @@ def test_update_namespace_properties_404(rest_mock:
Mocker) -> None:
assert "Namespace does not exist" in str(e.value)
-def test_load_table_200(rest_mock: Mocker) -> None:
+def test_load_table_200(rest_mock: Mocker,
example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any]) -> None:
rest_mock.get(
f"{TEST_URI}v1/namespaces/fokko/tables/table",
- json={
- "metadata-location":
"s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
- "metadata": {
- "format-version": 1,
- "table-uuid": "b55d9dda-6561-423a-8bfc-787980ce421f",
- "location": "s3://warehouse/database/table",
- "last-updated-ms": 1646787054459,
- "last-column-id": 2,
- "schema": {
- "type": "struct",
- "schema-id": 0,
- "fields": [
- {"id": 1, "name": "id", "required": False, "type":
"int"},
- {"id": 2, "name": "data", "required": False, "type":
"string"},
- ],
- },
- "current-schema-id": 0,
- "schemas": [
- {
- "type": "struct",
- "schema-id": 0,
- "fields": [
- {"id": 1, "name": "id", "required": False, "type":
"int"},
- {"id": 2, "name": "data", "required": False,
"type": "string"},
- ],
- }
- ],
- "partition-spec": [],
- "default-spec-id": 0,
- "partition-specs": [{"spec-id": 0, "fields": []}],
- "last-partition-id": 999,
- "default-sort-order-id": 0,
- "sort-orders": [{"order-id": 0, "fields": []}],
- "properties": {"owner": "bryan",
"write.metadata.compression-codec": "gzip"},
- "current-snapshot-id": 3497810964824022504,
- "refs": {"main": {"snapshot-id": 3497810964824022504, "type":
"branch"}},
- "snapshots": [
- {
- "snapshot-id": 3497810964824022504,
- "timestamp-ms": 1646787054459,
- "summary": {
- "operation": "append",
- "spark.app.id": "local-1646787004168",
- "added-data-files": "1",
- "added-records": "1",
- "added-files-size": "697",
- "changed-partition-count": "1",
- "total-records": "1",
- "total-files-size": "697",
- "total-data-files": "1",
- "total-delete-files": "0",
- "total-position-deletes": "0",
- "total-equality-deletes": "0",
- },
- "manifest-list":
"s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro",
- "schema-id": 0,
- }
- ],
- "snapshot-log": [{"timestamp-ms": 1646787054459,
"snapshot-id": 3497810964824022504}],
- "metadata-log": [
- {
- "timestamp-ms": 1646787031514,
- "metadata-file":
"s3://warehouse/database/table/metadata/00000-88484a1c-00e5-4a07-a787-c0e7aeffa805.gz.metadata.json",
- }
- ],
- },
- "config": {"client.factory":
"io.tabular.iceberg.catalog.TabularAwsClientFactory", "region": "us-west-2"},
- },
+ json=example_table_metadata_with_snapshot_v1_rest_json,
status_code=200,
request_headers=TEST_HEADERS,
)
@@ -417,78 +365,8 @@ def test_load_table_200(rest_mock: Mocker) -> None:
actual = catalog.load_table(("fokko", "table"))
expected = Table(
identifier=("rest", "fokko", "table"),
-
metadata_location="s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json",
- metadata=TableMetadataV1(
- location="s3://warehouse/database/table",
- table_uuid=UUID("b55d9dda-6561-423a-8bfc-787980ce421f"),
- last_updated_ms=1646787054459,
- last_column_id=2,
- schemas=[
- Schema(
- NestedField(field_id=1, name="id",
field_type=IntegerType(), required=False),
- NestedField(field_id=2, name="data",
field_type=StringType(), required=False),
- schema_id=0,
- identifier_field_ids=[],
- )
- ],
- current_schema_id=0,
- default_spec_id=0,
- last_partition_id=999,
- properties={"owner": "bryan", "write.metadata.compression-codec":
"gzip"},
- current_snapshot_id=3497810964824022504,
- snapshots=[
- Snapshot(
- snapshot_id=3497810964824022504,
- parent_snapshot_id=None,
- sequence_number=None,
- timestamp_ms=1646787054459,
-
manifest_list="s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro",
- summary=Summary(
- operation=Operation.APPEND,
- **{
- "spark.app.id": "local-1646787004168",
- "added-data-files": "1",
- "added-records": "1",
- "added-files-size": "697",
- "changed-partition-count": "1",
- "total-records": "1",
- "total-files-size": "697",
- "total-data-files": "1",
- "total-delete-files": "0",
- "total-position-deletes": "0",
- "total-equality-deletes": "0",
- },
- ),
- schema_id=0,
- )
- ],
- snapshot_log=[{"timestamp-ms": 1646787054459, "snapshot-id":
3497810964824022504}],
- metadata_log=[
- {
- "timestamp-ms": 1646787031514,
- "metadata-file":
"s3://warehouse/database/table/metadata/00000-88484a1c-00e5-4a07-a787-c0e7aeffa805.gz.metadata.json",
- }
- ],
- sort_orders=[SortOrder(order_id=0)],
- default_sort_order_id=0,
- refs={
- "main": SnapshotRef(
- snapshot_id=3497810964824022504,
- snapshot_ref_type=SnapshotRefType.BRANCH,
- min_snapshots_to_keep=None,
- max_snapshot_age_ms=None,
- max_ref_age_ms=None,
- )
- },
- format_version=1,
- schema_=Schema(
- NestedField(field_id=1, name="id", field_type=IntegerType(),
required=False),
- NestedField(field_id=2, name="data", field_type=StringType(),
required=False),
- schema_id=0,
- identifier_field_ids=[],
- ),
- partition_spec=[],
- ),
+
metadata_location=example_table_metadata_with_snapshot_v1_rest_json["metadata-location"],
+
metadata=TableMetadataV1(**example_table_metadata_with_snapshot_v1_rest_json["metadata"]),
io=load_file_io(),
catalog=catalog,
)
@@ -497,6 +375,29 @@ def test_load_table_200(rest_mock: Mocker) -> None:
assert actual == expected
+def test_load_table_from_self_identifier_200(
+ rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json:
Dict[str, Any]
+) -> None:
+ rest_mock.get(
+ f"{TEST_URI}v1/namespaces/pdames/tables/table",
+ json=example_table_metadata_with_snapshot_v1_rest_json,
+ status_code=200,
+ request_headers=TEST_HEADERS,
+ )
+ catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
+ table = catalog.load_table(("pdames", "table"))
+ actual = catalog.load_table(table.identifier)
+ expected = Table(
+ identifier=("rest", "pdames", "table"),
+
metadata_location=example_table_metadata_with_snapshot_v1_rest_json["metadata-location"],
+
metadata=TableMetadataV1(**example_table_metadata_with_snapshot_v1_rest_json["metadata"]),
+ io=load_file_io(),
+ catalog=catalog,
+ )
+ assert actual.metadata.model_dump() == expected.metadata.model_dump()
+ assert actual == expected
+
+
def test_load_table_404(rest_mock: Mocker) -> None:
rest_mock.get(
f"{TEST_URI}v1/namespaces/fokko/tables/does_not_exists",
@@ -535,62 +436,12 @@ def test_drop_table_404(rest_mock: Mocker) -> None:
assert "Table does not exist" in str(e.value)
-def test_create_table_200(rest_mock: Mocker, table_schema_simple: Schema) ->
None:
+def test_create_table_200(
+ rest_mock: Mocker, table_schema_simple: Schema,
example_table_metadata_no_snapshot_v1_rest_json: Dict[str, Any]
+) -> None:
rest_mock.post(
f"{TEST_URI}v1/namespaces/fokko/tables",
- json={
- "metadata-location": "s3://warehouse/database/table/metadata.json",
- "metadata": {
- "format-version": 1,
- "table-uuid": "bf289591-dcc0-4234-ad4f-5c3eed811a29",
- "location": "s3://warehouse/database/table",
- "last-updated-ms": 1657810967051,
- "last-column-id": 3,
- "schema": {
- "type": "struct",
- "schema-id": 0,
- "identifier-field-ids": [2],
- "fields": [
- {"id": 1, "name": "foo", "required": False, "type":
"string"},
- {"id": 2, "name": "bar", "required": True, "type":
"int"},
- {"id": 3, "name": "baz", "required": False, "type":
"boolean"},
- ],
- },
- "current-schema-id": 0,
- "schemas": [
- {
- "type": "struct",
- "schema-id": 0,
- "identifier-field-ids": [2],
- "fields": [
- {"id": 1, "name": "foo", "required": False,
"type": "string"},
- {"id": 2, "name": "bar", "required": True, "type":
"int"},
- {"id": 3, "name": "baz", "required": False,
"type": "boolean"},
- ],
- }
- ],
- "partition-spec": [],
- "default-spec-id": 0,
- "last-partition-id": 999,
- "default-sort-order-id": 0,
- "sort-orders": [{"order-id": 0, "fields": []}],
- "properties": {
- "write.delete.parquet.compression-codec": "zstd",
- "write.metadata.compression-codec": "gzip",
- "write.summary.partition-limit": "100",
- "write.parquet.compression-codec": "zstd",
- },
- "current-snapshot-id": -1,
- "refs": {},
- "snapshots": [],
- "snapshot-log": [],
- "metadata-log": [],
- },
- "config": {
- "client.factory":
"io.tabular.iceberg.catalog.TabularAwsClientFactory",
- "region": "us-west-2",
- },
- },
+ json=example_table_metadata_no_snapshot_v1_rest_json,
status_code=200,
request_headers=TEST_HEADERS,
)
@@ -607,47 +458,8 @@ def test_create_table_200(rest_mock: Mocker,
table_schema_simple: Schema) -> Non
)
expected = Table(
identifier=("rest", "fokko", "fokko2"),
- metadata_location="s3://warehouse/database/table/metadata.json",
- metadata=TableMetadataV1(
- location="s3://warehouse/database/table",
- table_uuid=UUID("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
- last_updated_ms=1657810967051,
- last_column_id=3,
- schemas=[
- Schema(
- NestedField(field_id=1, name="foo",
field_type=StringType(), required=False),
- NestedField(field_id=2, name="bar",
field_type=IntegerType(), required=True),
- NestedField(field_id=3, name="baz",
field_type=BooleanType(), required=False),
- schema_id=0,
- identifier_field_ids=[2],
- )
- ],
- current_schema_id=0,
- default_spec_id=0,
- last_partition_id=999,
- properties={
- "write.delete.parquet.compression-codec": "zstd",
- "write.metadata.compression-codec": "gzip",
- "write.summary.partition-limit": "100",
- "write.parquet.compression-codec": "zstd",
- },
- current_snapshot_id=None,
- snapshots=[],
- snapshot_log=[],
- metadata_log=[],
- sort_orders=[SortOrder(order_id=0)],
- default_sort_order_id=0,
- refs={},
- format_version=1,
- schema_=Schema(
- NestedField(field_id=1, name="foo", field_type=StringType(),
required=False),
- NestedField(field_id=2, name="bar", field_type=IntegerType(),
required=True),
- NestedField(field_id=3, name="baz", field_type=BooleanType(),
required=False),
- schema_id=0,
- identifier_field_ids=[2],
- ),
- partition_spec=[],
- ),
+
metadata_location=example_table_metadata_no_snapshot_v1_rest_json["metadata-location"],
+
metadata=TableMetadataV1(**example_table_metadata_no_snapshot_v1_rest_json["metadata"]),
io=load_file_io(),
catalog=catalog,
)
@@ -682,62 +494,12 @@ def test_create_table_409(rest_mock: Mocker,
table_schema_simple: Schema) -> Non
assert "Table already exists" in str(e.value)
-def test_register_table_200(rest_mock: Mocker, table_schema_simple: Schema) ->
None:
+def test_register_table_200(
+ rest_mock: Mocker, table_schema_simple: Schema,
example_table_metadata_no_snapshot_v1_rest_json: Dict[str, Any]
+) -> None:
rest_mock.post(
f"{TEST_URI}v1/namespaces/default/register",
- json={
- "metadata-location": "s3://warehouse/database/table/metadata.json",
- "metadata": {
- "format-version": 1,
- "table-uuid": "bf289591-dcc0-4234-ad4f-5c3eed811a29",
- "location": "s3://warehouse/database/table",
- "last-updated-ms": 1657810967051,
- "last-column-id": 3,
- "schema": {
- "type": "struct",
- "schema-id": 0,
- "identifier-field-ids": [2],
- "fields": [
- {"id": 1, "name": "foo", "required": False, "type":
"string"},
- {"id": 2, "name": "bar", "required": True, "type":
"int"},
- {"id": 3, "name": "baz", "required": False, "type":
"boolean"},
- ],
- },
- "current-schema-id": 0,
- "schemas": [
- {
- "type": "struct",
- "schema-id": 0,
- "identifier-field-ids": [2],
- "fields": [
- {"id": 1, "name": "foo", "required": False,
"type": "string"},
- {"id": 2, "name": "bar", "required": True, "type":
"int"},
- {"id": 3, "name": "baz", "required": False,
"type": "boolean"},
- ],
- }
- ],
- "partition-spec": [],
- "default-spec-id": 0,
- "last-partition-id": 999,
- "default-sort-order-id": 0,
- "sort-orders": [{"order-id": 0, "fields": []}],
- "properties": {
- "write.delete.parquet.compression-codec": "zstd",
- "write.metadata.compression-codec": "gzip",
- "write.summary.partition-limit": "100",
- "write.parquet.compression-codec": "zstd",
- },
- "current-snapshot-id": -1,
- "refs": {},
- "snapshots": [],
- "snapshot-log": [],
- "metadata-log": [],
- },
- "config": {
- "client.factory":
"io.tabular.iceberg.catalog.TabularAwsClientFactory",
- "region": "us-west-2",
- },
- },
+ json=example_table_metadata_no_snapshot_v1_rest_json,
status_code=200,
request_headers=TEST_HEADERS,
)
@@ -747,47 +509,8 @@ def test_register_table_200(rest_mock: Mocker,
table_schema_simple: Schema) -> N
)
expected = Table(
identifier=("rest", "default", "registered_table"),
- metadata_location="s3://warehouse/database/table/metadata.json",
- metadata=TableMetadataV1(
- location="s3://warehouse/database/table",
- table_uuid=UUID("bf289591-dcc0-4234-ad4f-5c3eed811a29"),
- last_updated_ms=1657810967051,
- last_column_id=3,
- schemas=[
- Schema(
- NestedField(field_id=1, name="foo",
field_type=StringType(), required=False),
- NestedField(field_id=2, name="bar",
field_type=IntegerType(), required=True),
- NestedField(field_id=3, name="baz",
field_type=BooleanType(), required=False),
- schema_id=0,
- identifier_field_ids=[2],
- )
- ],
- current_schema_id=0,
- default_spec_id=0,
- last_partition_id=999,
- properties={
- "write.delete.parquet.compression-codec": "zstd",
- "write.metadata.compression-codec": "gzip",
- "write.summary.partition-limit": "100",
- "write.parquet.compression-codec": "zstd",
- },
- current_snapshot_id=None,
- snapshots=[],
- snapshot_log=[],
- metadata_log=[],
- sort_orders=[SortOrder(order_id=0)],
- default_sort_order_id=0,
- refs={},
- format_version=1,
- schema_=Schema(
- NestedField(field_id=1, name="foo", field_type=StringType(),
required=False),
- NestedField(field_id=2, name="bar", field_type=IntegerType(),
required=True),
- NestedField(field_id=3, name="baz", field_type=BooleanType(),
required=False),
- schema_id=0,
- identifier_field_ids=[2],
- ),
- partition_spec=[],
- ),
+
metadata_location=example_table_metadata_no_snapshot_v1_rest_json["metadata-location"],
+
metadata=TableMetadataV1(**example_table_metadata_no_snapshot_v1_rest_json["metadata"]),
io=load_file_io(),
catalog=catalog,
)
@@ -839,6 +562,97 @@ def test_delete_table_204(rest_mock: Mocker) -> None:
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).drop_table(("example",
"fokko"))
+def test_delete_table_from_self_identifier_204(
+ rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json:
Dict[str, Any]
+) -> None:
+ rest_mock.get(
+ f"{TEST_URI}v1/namespaces/pdames/tables/table",
+ json=example_table_metadata_with_snapshot_v1_rest_json,
+ status_code=200,
+ request_headers=TEST_HEADERS,
+ )
+ catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
+ table = catalog.load_table(("pdames", "table"))
+ rest_mock.delete(
+ f"{TEST_URI}v1/namespaces/pdames/tables/table",
+ json={},
+ status_code=204,
+ request_headers=TEST_HEADERS,
+ )
+ catalog.drop_table(table.identifier)
+
+
+def test_rename_table_200(rest_mock: Mocker,
example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any]) -> None:
+ rest_mock.post(
+ f"{TEST_URI}v1/tables/rename",
+ json={
+ "source": {"namespace": ("pdames",), "name": "source"},
+ "destination": {"namespace": ("pdames",), "name": "destination"},
+ },
+ status_code=200,
+ request_headers=TEST_HEADERS,
+ )
+ rest_mock.get(
+ f"{TEST_URI}v1/namespaces/pdames/tables/destination",
+ json=example_table_metadata_with_snapshot_v1_rest_json,
+ status_code=200,
+ request_headers=TEST_HEADERS,
+ )
+ catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
+ from_identifier = ("pdames", "source")
+ to_identifier = ("pdames", "destination")
+ actual = catalog.rename_table(from_identifier, to_identifier)
+ expected = Table(
+ identifier=("rest", "pdames", "destination"),
+
metadata_location=example_table_metadata_with_snapshot_v1_rest_json["metadata-location"],
+
metadata=TableMetadataV1(**example_table_metadata_with_snapshot_v1_rest_json["metadata"]),
+ io=load_file_io(),
+ catalog=catalog,
+ )
+ assert actual.metadata.model_dump() == expected.metadata.model_dump()
+ assert actual == expected
+
+
+def test_rename_table_from_self_identifier_200(
+ rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json:
Dict[str, Any]
+) -> None:
+ rest_mock.get(
+ f"{TEST_URI}v1/namespaces/pdames/tables/source",
+ json=example_table_metadata_with_snapshot_v1_rest_json,
+ status_code=200,
+ request_headers=TEST_HEADERS,
+ )
+ catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
+ from_identifier = ("pdames", "source")
+ to_identifier = ("pdames", "destination")
+ table = catalog.load_table(from_identifier)
+ rest_mock.post(
+ f"{TEST_URI}v1/tables/rename",
+ json={
+ "source": {"namespace": ("pdames",), "name": "source"},
+ "destination": {"namespace": ("pdames",), "name": "destination"},
+ },
+ status_code=200,
+ request_headers=TEST_HEADERS,
+ )
+ rest_mock.get(
+ f"{TEST_URI}v1/namespaces/pdames/tables/destination",
+ json=example_table_metadata_with_snapshot_v1_rest_json,
+ status_code=200,
+ request_headers=TEST_HEADERS,
+ )
+ actual = catalog.rename_table(table.identifier, to_identifier)
+ expected = Table(
+ identifier=("rest", "pdames", "destination"),
+
metadata_location=example_table_metadata_with_snapshot_v1_rest_json["metadata-location"],
+
metadata=TableMetadataV1(**example_table_metadata_with_snapshot_v1_rest_json["metadata"]),
+ io=load_file_io(),
+ catalog=catalog,
+ )
+ assert actual.metadata.model_dump() == expected.metadata.model_dump()
+ assert actual == expected
+
+
def test_delete_table_404(rest_mock: Mocker) -> None:
rest_mock.delete(
f"{TEST_URI}v1/namespaces/example/tables/fokko",
diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py
index 4277845..56d2c16 100644
--- a/tests/catalog/test_sql.py
+++ b/tests/catalog/test_sql.py
@@ -188,6 +188,20 @@ def test_load_table(test_catalog: SqlCatalog,
table_schema_nested: Schema, rando
assert table.metadata == loaded_table.metadata
+def test_load_table_from_self_identifier(
+ test_catalog: SqlCatalog, table_schema_nested: Schema, random_identifier:
Identifier
+) -> None:
+ database_name, _table_name = random_identifier
+ test_catalog.create_namespace(database_name)
+ table = test_catalog.create_table(random_identifier, table_schema_nested)
+ intermediate = test_catalog.load_table(random_identifier)
+ assert intermediate.identifier == (test_catalog.name,) + random_identifier
+ loaded_table = test_catalog.load_table(intermediate.identifier)
+ assert table.identifier == loaded_table.identifier
+ assert table.metadata_location == loaded_table.metadata_location
+ assert table.metadata == loaded_table.metadata
+
+
def test_drop_table(test_catalog: SqlCatalog, table_schema_nested: Schema,
random_identifier: Identifier) -> None:
database_name, _table_name = random_identifier
test_catalog.create_namespace(database_name)
@@ -198,6 +212,20 @@ def test_drop_table(test_catalog: SqlCatalog,
table_schema_nested: Schema, rando
test_catalog.load_table(random_identifier)
+def test_drop_table_from_self_identifier(
+ test_catalog: SqlCatalog, table_schema_nested: Schema, random_identifier:
Identifier
+) -> None:
+ database_name, _table_name = random_identifier
+ test_catalog.create_namespace(database_name)
+ table = test_catalog.create_table(random_identifier, table_schema_nested)
+ assert table.identifier == (test_catalog.name,) + random_identifier
+ test_catalog.drop_table(table.identifier)
+ with pytest.raises(NoSuchTableError):
+ test_catalog.load_table(table.identifier)
+ with pytest.raises(NoSuchTableError):
+ test_catalog.load_table(random_identifier)
+
+
def test_drop_table_that_does_not_exist(test_catalog: SqlCatalog,
random_identifier: Identifier) -> None:
with pytest.raises(NoSuchTableError):
test_catalog.drop_table(random_identifier)
@@ -220,6 +248,25 @@ def test_rename_table(
test_catalog.load_table(random_identifier)
+def test_rename_table_from_self_identifier(
+ test_catalog: SqlCatalog, table_schema_nested: Schema, random_identifier:
Identifier, another_random_identifier: Identifier
+) -> None:
+ from_database_name, _from_table_name = random_identifier
+ to_database_name, _to_table_name = another_random_identifier
+ test_catalog.create_namespace(from_database_name)
+ test_catalog.create_namespace(to_database_name)
+ table = test_catalog.create_table(random_identifier, table_schema_nested)
+ assert table.identifier == (test_catalog.name,) + random_identifier
+ test_catalog.rename_table(table.identifier, another_random_identifier)
+ new_table = test_catalog.load_table(another_random_identifier)
+ assert new_table.identifier == (test_catalog.name,) +
another_random_identifier
+ assert new_table.metadata_location == table.metadata_location
+ with pytest.raises(NoSuchTableError):
+ test_catalog.load_table(table.identifier)
+ with pytest.raises(NoSuchTableError):
+ test_catalog.load_table(random_identifier)
+
+
def test_rename_table_to_existing_one(
test_catalog: SqlCatalog, table_schema_nested: Schema, random_identifier:
Identifier, another_random_identifier: Identifier
) -> None:
diff --git a/tests/conftest.py b/tests/conftest.py
index 4bd1921..72a7ad0 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -354,6 +354,133 @@ def all_avro_types() -> Dict[str, Any]:
}
+EXAMPLE_TABLE_METADATA_WITH_SNAPSHOT_V1 = {
+ "format-version": 1,
+ "table-uuid": "b55d9dda-6561-423a-8bfc-787980ce421f",
+ "location": "s3://warehouse/database/table",
+ "last-updated-ms": 1646787054459,
+ "last-column-id": 2,
+ "schema": {
+ "type": "struct",
+ "schema-id": 0,
+ "fields": [
+ {"id": 1, "name": "id", "required": False, "type": "int"},
+ {"id": 2, "name": "data", "required": False, "type": "string"},
+ ],
+ },
+ "current-schema-id": 0,
+ "schemas": [
+ {
+ "type": "struct",
+ "schema-id": 0,
+ "fields": [
+ {"id": 1, "name": "id", "required": False, "type": "int"},
+ {"id": 2, "name": "data", "required": False, "type": "string"},
+ ],
+ }
+ ],
+ "partition-spec": [],
+ "default-spec-id": 0,
+ "partition-specs": [{"spec-id": 0, "fields": []}],
+ "last-partition-id": 999,
+ "default-sort-order-id": 0,
+ "sort-orders": [{"order-id": 0, "fields": []}],
+ "properties": {
+ "owner": "bryan",
+ "write.metadata.compression-codec": "gzip",
+ },
+ "current-snapshot-id": 3497810964824022504,
+ "refs": {"main": {"snapshot-id": 3497810964824022504, "type": "branch"}},
+ "snapshots": [
+ {
+ "snapshot-id": 3497810964824022504,
+ "timestamp-ms": 1646787054459,
+ "summary": {
+ "operation": "append",
+ "spark.app.id": "local-1646787004168",
+ "added-data-files": "1",
+ "added-records": "1",
+ "added-files-size": "697",
+ "changed-partition-count": "1",
+ "total-records": "1",
+ "total-files-size": "697",
+ "total-data-files": "1",
+ "total-delete-files": "0",
+ "total-position-deletes": "0",
+ "total-equality-deletes": "0",
+ },
+ "manifest-list":
"s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro",
+ "schema-id": 0,
+ }
+ ],
+ "snapshot-log": [{"timestamp-ms": 1646787054459, "snapshot-id":
3497810964824022504}],
+ "metadata-log": [
+ {
+ "timestamp-ms": 1646787031514,
+ "metadata-file":
"s3://warehouse/database/table/metadata/00000-88484a1c-00e5-4a07-a787-c0e7aeffa805.gz.metadata.json",
+ }
+ ],
+}
+
+
[email protected]
+def example_table_metadata_with_snapshot_v1() -> Dict[str, Any]:
+ return EXAMPLE_TABLE_METADATA_WITH_SNAPSHOT_V1
+
+
+EXAMPLE_TABLE_METADATA_NO_SNAPSHOT_V1 = {
+ "format-version": 1,
+ "table-uuid": "bf289591-dcc0-4234-ad4f-5c3eed811a29",
+ "location": "s3://warehouse/database/table",
+ "last-updated-ms": 1657810967051,
+ "last-column-id": 3,
+ "schema": {
+ "type": "struct",
+ "schema-id": 0,
+ "identifier-field-ids": [2],
+ "fields": [
+ {"id": 1, "name": "foo", "required": False, "type": "string"},
+ {"id": 2, "name": "bar", "required": True, "type": "int"},
+ {"id": 3, "name": "baz", "required": False, "type": "boolean"},
+ ],
+ },
+ "current-schema-id": 0,
+ "schemas": [
+ {
+ "type": "struct",
+ "schema-id": 0,
+ "identifier-field-ids": [2],
+ "fields": [
+ {"id": 1, "name": "foo", "required": False, "type": "string"},
+ {"id": 2, "name": "bar", "required": True, "type": "int"},
+ {"id": 3, "name": "baz", "required": False, "type": "boolean"},
+ ],
+ }
+ ],
+ "partition-spec": [],
+ "default-spec-id": 0,
+ "last-partition-id": 999,
+ "default-sort-order-id": 0,
+ "sort-orders": [{"order-id": 0, "fields": []}],
+ "properties": {
+ "write.delete.parquet.compression-codec": "zstd",
+ "write.metadata.compression-codec": "gzip",
+ "write.summary.partition-limit": "100",
+ "write.parquet.compression-codec": "zstd",
+ },
+ "current-snapshot-id": -1,
+ "refs": {},
+ "snapshots": [],
+ "snapshot-log": [],
+ "metadata-log": [],
+}
+
+
[email protected]
+def example_table_metadata_no_snapshot_v1() -> Dict[str, Any]:
+ return EXAMPLE_TABLE_METADATA_NO_SNAPSHOT_V1
+
+
EXAMPLE_TABLE_METADATA_V2 = {
"format-version": 2,
"table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",