rdblue commented on code in PR #4706: URL: https://github.com/apache/iceberg/pull/4706#discussion_r876129029
########## python/tests/catalog/test_base.py: ########## @@ -0,0 +1,418 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Dict, List, Optional, Set, Union + +import pytest + +from iceberg.catalog.base import Catalog, Identifier, Metadata +from iceberg.exceptions import ( + AlreadyExistsError, + NamespaceNotEmptyError, + NoSuchNamespaceError, + NoSuchTableError, +) +from iceberg.schema import Schema +from iceberg.table.base import PartitionSpec, Table + + +class InMemoryCatalog(Catalog): + """An in-memory catalog implementation for testing purposes.""" + + __tables: Dict[Identifier, Table] + __namespaces: Dict[Identifier, Metadata] + + def __init__(self, name: str, properties: Metadata): + super().__init__(name, properties) + self.__tables = {} + self.__namespaces = {} + + def create_table( + self, + identifier: Union[str, Identifier], + schema: Schema, + location: Optional[str] = None, + partition_spec: Optional[PartitionSpec] = None, + properties: Optional[Metadata] = None, + ) -> Table: + + identifier = InMemoryCatalog.identifier_to_tuple(identifier) + namespace = InMemoryCatalog.namespace_from(identifier) + name = InMemoryCatalog.name_from(identifier) + + if identifier in self.__tables: + raise AlreadyExistsError(f"Table {name} already exists in namespace {namespace}") + else: + if namespace not in self.__namespaces: + self.__namespaces[namespace] = {} + + table = Table() + self.__tables[identifier] = table + return table + + def load_table(self, identifier: Union[str, Identifier]) -> Table: + identifier = InMemoryCatalog.identifier_to_tuple(identifier) + namespace = InMemoryCatalog.namespace_from(identifier) + name = InMemoryCatalog.name_from(identifier) + try: + return self.__tables[identifier] + except KeyError: + raise NoSuchTableError(f"Table {name} not found in the namespace {namespace}") + + def drop_table(self, identifier: Union[str, Identifier]) -> None: + identifier = InMemoryCatalog.identifier_to_tuple(identifier) + namespace = InMemoryCatalog.namespace_from(identifier) + name = InMemoryCatalog.name_from(identifier) + try: + self.__tables.pop(identifier) + except KeyError: + raise NoSuchTableError(f"Table {name} not found in the namespace {namespace}") + + def purge_table(self, identifier: Union[str, Identifier]) -> None: + self.drop_table(identifier) + + def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: + from_identifier = InMemoryCatalog.identifier_to_tuple(from_identifier) + from_namespace = InMemoryCatalog.namespace_from(from_identifier) + from_name = InMemoryCatalog.name_from(from_identifier) + try: + self.__tables.pop(from_identifier) + except KeyError: + raise NoSuchTableError(f"Table {from_name} not found in the namespace {from_namespace}") + + renamed_table = Table() + to_identifier = InMemoryCatalog.identifier_to_tuple(to_identifier) + to_namespace = InMemoryCatalog.namespace_from(to_identifier) + if to_namespace not in self.__namespaces: + self.__namespaces[to_namespace] = {} + + self.__tables[to_identifier] = renamed_table + return renamed_table + + def create_namespace(self, namespace: Union[str, Identifier], properties: Optional[Metadata] = None) -> None: + namespace = InMemoryCatalog.identifier_to_tuple(namespace) + if namespace in self.__namespaces: + raise AlreadyExistsError(f"Namespace {namespace} already exists") + else: + self.__namespaces[namespace] = properties if properties else {} + + def drop_namespace(self, namespace: Union[str, Identifier]) -> None: + namespace = InMemoryCatalog.identifier_to_tuple(namespace) + if [table_identifier for table_identifier in self.__tables.keys() if namespace == table_identifier[:-1]]: + raise NamespaceNotEmptyError(f"Namespace {namespace} not empty") + try: + self.__namespaces.pop(namespace) + except KeyError: + raise NoSuchNamespaceError(f"Namespace {namespace} not found in the catalog") + + def list_tables(self, namespace: Optional[Union[str, Identifier]] = None) -> List[Identifier]: + if namespace: + namespace = InMemoryCatalog.identifier_to_tuple(namespace) + list_tables = [table_identifier for table_identifier in self.__tables.keys() if namespace == table_identifier[:-1]] + else: + list_tables = list(self.__tables.keys()) + + # Casting to make mypy happy + return list_tables + + def list_namespaces(self) -> List[Identifier]: + return list(self.__namespaces.keys()) + + def load_namespace(self, namespace: Union[str, Identifier]) -> Metadata: + namespace = InMemoryCatalog.identifier_to_tuple(namespace) + try: + return self.__namespaces[namespace] + except KeyError: + raise NoSuchNamespaceError(f"Namespace {namespace} not found in the catalog") + + def update_namespace_metadata( + self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Optional[Metadata] = None + ) -> None: + namespace = InMemoryCatalog.identifier_to_tuple(namespace) + removals = {} if not removals else removals + updates = [] if not updates else updates + if namespace in self.__namespaces: + [self.__namespaces[namespace].pop(key) for key in removals] + self.__namespaces[namespace].update(updates) + else: + raise NoSuchNamespaceError(f"Namespace {namespace} not found in the catalog") + + @staticmethod + def name_from(identifier: Union[str, Identifier]) -> str: + return InMemoryCatalog.identifier_to_tuple(identifier)[-1] + + @staticmethod + def namespace_from(identifier: Union[str, Identifier]) -> Identifier: + return InMemoryCatalog.identifier_to_tuple(identifier)[:-1] + + @staticmethod + def identifier_to_tuple(identifier: Union[str, Identifier]) -> Identifier: + return identifier if isinstance(identifier, tuple) else tuple(str.split(identifier, ".")) + + +TEST_TABLE_IDENTIFIER = ("com", "organization", "department", "my_table") +TEST_TABLE_NAMESPACE = ("com", "organization", "department") +TEST_TABLE_NAME = "my_table" +TEST_TABLE_SCHEMA = Schema(schema_id=1) +TEST_TABLE_LOCATION = "protocol://some/location" +TEST_TABLE_PARTITION_SPEC = PartitionSpec() +TEST_TABLE_PROPERTIES = {"key1": "value1", "key2": "value2"} + + +def given_catalog_has_a_table(catalog: InMemoryCatalog) -> Table: + return catalog.create_table( + identifier=TEST_TABLE_IDENTIFIER, + schema=TEST_TABLE_SCHEMA, + location=TEST_TABLE_LOCATION, + partition_spec=TEST_TABLE_PARTITION_SPEC, + properties=TEST_TABLE_PROPERTIES, + ) + + +def test_namespace_from_tuple(): + # Given + identifier = ("com", "organization", "department", "my_table") + # When + namespace_from = InMemoryCatalog.namespace_from(identifier) + # Then + assert namespace_from == ("com", "organization", "department") + + +def test_namespace_from_str(): + # Given + identifier = "com.organization.department.my_table" + # When + namespace_from = InMemoryCatalog.namespace_from(identifier) + # Then + assert namespace_from == ("com", "organization", "department") + + +def test_name_from_tuple(): + # Given + identifier = ("com", "organization", "department", "my_table") + # When + name_from = InMemoryCatalog.name_from(identifier) + # Then + assert name_from == "my_table" + + +def test_name_from_str(): + # Given + identifier = "com.organization.department.my_table" + # When + name_from = InMemoryCatalog.name_from(identifier) + # Then + assert name_from == "my_table" + + +def test_create_table(catalog: InMemoryCatalog): + table = catalog.create_table( + identifier=TEST_TABLE_IDENTIFIER, + schema=TEST_TABLE_SCHEMA, + location=TEST_TABLE_LOCATION, + partition_spec=TEST_TABLE_PARTITION_SPEC, + properties=TEST_TABLE_PROPERTIES, + ) + assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table Review Comment: When creating a table, I think this should validate that the table looks as expected by validating the schema, partition spec, location, and properties. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
