dhruv-pratap commented on code in PR #4706: URL: https://github.com/apache/iceberg/pull/4706#discussion_r877542835
########## python/tests/catalog/test_base.py: ########## @@ -0,0 +1,410 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Dict, List, Optional, Set, Union + +import pytest + +from iceberg.catalog.base import Catalog, Identifier, Properties +from iceberg.exceptions import ( + AlreadyExistsError, + NamespaceNotEmptyError, + NoSuchNamespaceError, + NoSuchTableError, +) +from iceberg.schema import Schema +from iceberg.table.base import PartitionSpec, Table + + +class InMemoryCatalog(Catalog): + """An in-memory catalog implementation for testing purposes.""" + + __tables: Dict[Identifier, Table] + __namespaces: Dict[Identifier, Properties] + + def __init__(self, name: str, properties: Properties): + super().__init__(name, properties) + self.__tables = {} + self.__namespaces = {} + + def create_table( + self, + identifier: Union[str, Identifier], + schema: Schema, + location: Optional[str] = None, + partition_spec: Optional[PartitionSpec] = None, + properties: Optional[Properties] = None, + ) -> Table: + + identifier = Catalog.identifier_to_tuple(identifier) + namespace = Catalog.namespace_from(identifier) + name = Catalog.table_name_from(identifier) + + if identifier in self.__tables: + raise AlreadyExistsError(f"Table {name} already exists in namespace {namespace}") + else: + if namespace not in self.__namespaces: + self.__namespaces[namespace] = {} + + table = Table() + self.__tables[identifier] = table + return table + + def load_table(self, identifier: Union[str, Identifier]) -> Table: + identifier = Catalog.identifier_to_tuple(identifier) + namespace = Catalog.namespace_from(identifier) + name = Catalog.table_name_from(identifier) + try: + return self.__tables[identifier] + except KeyError as error: + raise NoSuchTableError(f"Table {name} not found in the namespace {namespace}") from error + + def drop_table(self, identifier: Union[str, Identifier]) -> None: + identifier = Catalog.identifier_to_tuple(identifier) + namespace = Catalog.namespace_from(identifier) + name = Catalog.table_name_from(identifier) + try: + self.__tables.pop(identifier) + except KeyError as error: + raise NoSuchTableError(f"Table {name} not found in the namespace {namespace}") from error + + def purge_table(self, identifier: Union[str, Identifier]) -> None: + self.drop_table(identifier) + + def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: + from_identifier = Catalog.identifier_to_tuple(from_identifier) + from_namespace = Catalog.namespace_from(from_identifier) + from_name = Catalog.table_name_from(from_identifier) + try: + self.__tables.pop(from_identifier) + except KeyError as error: + raise NoSuchTableError(f"Table {from_name} not found in the namespace {from_namespace}") from error + + renamed_table = Table() + to_identifier = Catalog.identifier_to_tuple(to_identifier) + to_namespace = Catalog.namespace_from(to_identifier) + if to_namespace not in self.__namespaces: + self.__namespaces[to_namespace] = {} + + self.__tables[to_identifier] = renamed_table + return renamed_table + + def create_namespace(self, namespace: Union[str, Identifier], properties: Optional[Properties] = None) -> None: + namespace = Catalog.identifier_to_tuple(namespace) + if namespace in self.__namespaces: + raise AlreadyExistsError(f"Namespace {namespace} already exists") + else: + self.__namespaces[namespace] = properties if properties else {} + + def drop_namespace(self, namespace: Union[str, Identifier]) -> None: + namespace = Catalog.identifier_to_tuple(namespace) + if [table_identifier for table_identifier in self.__tables.keys() if namespace == table_identifier[:-1]]: + raise NamespaceNotEmptyError(f"Namespace {namespace} not empty") + try: + self.__namespaces.pop(namespace) + except KeyError as error: + raise NoSuchNamespaceError(f"Namespace {namespace} not found in the catalog") from error + + def list_tables(self, namespace: Optional[Union[str, Identifier]] = None) -> List[Identifier]: + if namespace: + namespace = Catalog.identifier_to_tuple(namespace) + list_tables = [table_identifier for table_identifier in self.__tables.keys() if namespace == table_identifier[:-1]] + else: + list_tables = list(self.__tables.keys()) + + return list_tables + + def list_namespaces(self) -> List[Identifier]: + return list(self.__namespaces.keys()) + + def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: + namespace = Catalog.identifier_to_tuple(namespace) + try: + return self.__namespaces[namespace] + except KeyError as error: + raise NoSuchNamespaceError(f"Namespace {namespace} not found in the catalog") from error + + def update_namespace_properties( + self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Optional[Properties] = None + ) -> None: + namespace = Catalog.identifier_to_tuple(namespace) + removals = {} if not removals else removals + updates = [] if not updates else updates + if namespace in self.__namespaces: + for key in removals: + if key in self.__namespaces[namespace]: + del self.__namespaces[namespace][key] + self.__namespaces[namespace].update(updates) + else: + raise NoSuchNamespaceError(f"Namespace {namespace} not found in the catalog") + + +TEST_TABLE_IDENTIFIER = ("com", "organization", "department", "my_table") +TEST_TABLE_NAMESPACE = ("com", "organization", "department") +TEST_TABLE_NAME = "my_table" +TEST_TABLE_SCHEMA = Schema(schema_id=1) +TEST_TABLE_LOCATION = "protocol://some/location" +TEST_TABLE_PARTITION_SPEC = PartitionSpec() +TEST_TABLE_PROPERTIES = {"key1": "value1", "key2": "value2"} + + +def given_catalog_has_a_table(catalog: InMemoryCatalog) -> Table: + return catalog.create_table( + identifier=TEST_TABLE_IDENTIFIER, + schema=TEST_TABLE_SCHEMA, + location=TEST_TABLE_LOCATION, + partition_spec=TEST_TABLE_PARTITION_SPEC, + properties=TEST_TABLE_PROPERTIES, + ) + + +def test_namespace_from_tuple(): + # Given + identifier = ("com", "organization", "department", "my_table") + # When + namespace_from = Catalog.namespace_from(identifier) + # Then + assert namespace_from == ("com", "organization", "department") + + +def test_namespace_from_str(): + # Given + identifier = "com.organization.department.my_table" + # When + namespace_from = Catalog.namespace_from(identifier) + # Then + assert namespace_from == ("com", "organization", "department") + + +def test_name_from_tuple(): + # Given + identifier = ("com", "organization", "department", "my_table") + # When + name_from = Catalog.table_name_from(identifier) + # Then + assert name_from == "my_table" + + +def test_name_from_str(): + # Given + identifier = "com.organization.department.my_table" + # When + name_from = Catalog.table_name_from(identifier) + # Then + assert name_from == "my_table" + + +def test_create_table(catalog: InMemoryCatalog): + table = catalog.create_table( + identifier=TEST_TABLE_IDENTIFIER, + schema=TEST_TABLE_SCHEMA, + location=TEST_TABLE_LOCATION, + partition_spec=TEST_TABLE_PARTITION_SPEC, + properties=TEST_TABLE_PROPERTIES, + ) + assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table + + +def test_create_table_raises_error_when_table_already_exists(catalog: InMemoryCatalog): + # Given + given_catalog_has_a_table(catalog) + # When + with pytest.raises(AlreadyExistsError, match="Table my_table already exists"): + catalog.create_table( + identifier=TEST_TABLE_IDENTIFIER, + schema=TEST_TABLE_SCHEMA, + ) + + +def test_load_table(catalog: InMemoryCatalog): + # Given + given_table = given_catalog_has_a_table(catalog) + # When + table = catalog.load_table(TEST_TABLE_IDENTIFIER) + # Then + assert table == given_table + + +def test_table_raises_error_on_table_not_found(catalog: InMemoryCatalog): + with pytest.raises(NoSuchTableError, match="Table my_table not found"): + catalog.load_table(TEST_TABLE_IDENTIFIER) + + +def test_drop_table(catalog: InMemoryCatalog): + # Given + given_catalog_has_a_table(catalog) + # When + catalog.drop_table(TEST_TABLE_IDENTIFIER) + # Then + with pytest.raises(NoSuchTableError, match="Table my_table not found"): + catalog.load_table(TEST_TABLE_IDENTIFIER) + + +def test_drop_table_that_does_not_exist_raise_error(catalog: InMemoryCatalog): + with pytest.raises(NoSuchTableError, match="Table my_table not found"): + catalog.load_table(TEST_TABLE_IDENTIFIER) + + +def test_purge_table(catalog: InMemoryCatalog): + # Given + given_catalog_has_a_table(catalog) + # When + catalog.purge_table(TEST_TABLE_IDENTIFIER) + # Then + with pytest.raises(NoSuchTableError, match="Table my_table not found"): + catalog.load_table(TEST_TABLE_IDENTIFIER) + + +def test_rename_table(catalog: InMemoryCatalog): + # Given + given_table = given_catalog_has_a_table(catalog) + + # When + new_table = "new.namespace.new_table" + table = catalog.rename_table(TEST_TABLE_IDENTIFIER, new_table) + + # Then + assert table + assert table is not given_table Review Comment: @rdblue In the earlier commits I was asserting if the table name has changed, but since I had rollback any changes to the `Table` interface in this PR I resorted to checking just that the table is changing in some form post renaming. I will eventually be changing this to assert table name change once we add attributes back to `Table` in #3227 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
