jackye1995 commented on code in PR #5870: URL: https://github.com/apache/iceberg/pull/5870#discussion_r984966239
########## python/pyiceberg/catalog/glue.py: ########## @@ -0,0 +1,246 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +import getpass +import uuid + +import boto3 +from datetime import datetime +from typing import Union, Optional, List, Set, Dict + +from pyiceberg.catalog.hive import OWNER + +from pyiceberg.exceptions import ( + NamespaceAlreadyExistsError, + NamespaceNotEmptyError, + NoSuchNamespaceError, + NoSuchTableError, + TableAlreadyExistsError, +) + +from pyiceberg.catalog import ( + Catalog, + Identifier, + Properties, + PropertiesUpdateSummary, +) +from pyiceberg.io import FileIO, load_file_io +from pyiceberg.schema import Schema +from pyiceberg.serializers import FromInputFile, ToOutputFile +from pyiceberg.table import Table +from pyiceberg.table.metadata import TableMetadata, new_table_metadata +from pyiceberg.table.partitioning import PartitionSpec, UNPARTITIONED_PARTITION_SPEC +from pyiceberg.table.sorting import SortOrder, UNSORTED_SORT_ORDER, SortDirection +from pyiceberg.typedef import EMPTY_DICT + +from pyiceberg.types import NestedField + +ICEBERG = "ICEBERG" +EXTERNAL_TABLE_TYPE = "EXTERNAL_TABLE" + +PROP_TABLE_TYPE = "table_type" +PROP_WAREHOUSE = "warehouse" +PROP_METADATA_LOCATION = "metadata_location" +PROP_TABLE_DESCRIPTION = "description" + +PROP_GLUE_TABLE = "Table" +PROP_GLUE_TABLE_PARAMETERS = "Parameters" +PROP_GLUE_TABLE_DATABASE_NAME = "DatabaseName" +PROP_GLUE_TABLE_NAME = "Name" + +PROP_GLUE_DATABASE = "Database" +PROP_GLUE_DATABASE_LOCATION = "LocationUri" + + +def _construct_parameters(metadata_location: str) -> Dict[str, str]: + properties = {PROP_TABLE_TYPE: ICEBERG, PROP_METADATA_LOCATION: metadata_location} + return properties + + +def _convert_glue_to_iceberg(glue_table, io: FileIO) -> Table: + properties: Dict[str, str] = glue_table[PROP_GLUE_TABLE_PARAMETERS] + + if PROP_TABLE_TYPE not in properties: + raise NoSuchTableError( + f"Property table_type missing, could not determine type: " + f"{glue_table[PROP_GLUE_TABLE_DATABASE_NAME]}.{glue_table[PROP_GLUE_TABLE_NAME]}") + glue_table_type = properties.get(PROP_TABLE_TYPE) + if glue_table_type != ICEBERG: + raise NoSuchTableError( + f"Property table_type is {glue_table_type}, expected {ICEBERG}: " + f"{glue_table[PROP_GLUE_TABLE_DATABASE_NAME]}.{glue_table[PROP_GLUE_TABLE_NAME]}") + if prop_meta_location := properties.get(PROP_METADATA_LOCATION): + metadata_location = prop_meta_location + else: + raise NoSuchTableError(f"Table property {PROP_METADATA_LOCATION} is missing") + + file = io.new_input(metadata_location) + metadata = FromInputFile.table_metadata(file) + return Table( + identifier=(glue_table[PROP_GLUE_TABLE_DATABASE_NAME], glue_table[PROP_GLUE_TABLE_NAME]), + metadata=metadata, + metadata_location=metadata_location + ) + + +def _write_metadata(metadata: TableMetadata, io: FileIO, metadate_path: str): + ToOutputFile.table_metadata(metadata, io.new_output(metadate_path)) + + +class GlueCatalog(Catalog): + + def __init__(self, name: str, **properties: Properties): + super().__init__(name, **properties) + self.glue = boto3.client("glue") + + def _default_warehouse_location(self, database_name: str, table_name: str): + try: + response = self.glue.get_database(Name=database_name) + except self.glue.exceptions.EntityNotFoundException: + raise NoSuchNamespaceError(f"The database: {database_name} does not exist") + + if PROP_GLUE_DATABASE_LOCATION in response[PROP_GLUE_DATABASE]: + return f"{response[PROP_GLUE_DATABASE][PROP_GLUE_DATABASE]}/table_name" + + if PROP_WAREHOUSE in self.properties: + return f"{self.properties[PROP_WAREHOUSE]}/{database_name}.db/{table_name}" + + raise ValueError("No default path is set, please specify a location when creating a table") + + def _resolve_table_location(self, location: Optional[str], database_name: str, table_name: str): + if not location: + return self._default_warehouse_location(database_name, table_name) + return location + + # tested on pre-existing database + def create_table( + self, + identifier: Union[str, Identifier], + schema: Schema, + location: Optional[str] = None, + partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, + sort_order: SortOrder = UNSORTED_SORT_ORDER, + properties: Properties = EMPTY_DICT, + ) -> Table: + """Create a table + + Args: + identifier: Table identifier. + schema: Table's schema. + location: Location for the table. Optional Argument. + partition_spec: PartitionSpec for the table. + sort_order: SortOrder for the table. + properties: Table properties that can be a string based dictionary. + + Returns: + Table: the created table instance + + Raises: + AlreadyExistsError: If a table with the name already exists + ValueError: If the identifier is invalid + """ + database_name, table_name = self.identifier_to_tuple(identifier) + + location = self._resolve_table_location(location, database_name, table_name) + metadata_location = f"{location}/metadata/00000-{uuid.uuid4()}.metadata.json" + metadata = new_table_metadata( + location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, + properties=properties + ) + io = load_file_io({**self.properties, **properties}, location=location) + _write_metadata(metadata, io, metadata_location) + try: + self.glue.create_table( + DatabaseName=database_name, + TableInput={ + 'Name': table_name, + 'Description': properties[PROP_TABLE_DESCRIPTION] Review Comment: is description required to create the Glue table? If not we should use None instead of "" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
