Fokko commented on code in PR #5870: URL: https://github.com/apache/iceberg/pull/5870#discussion_r985715547
########## python/pyiceberg/catalog/glue.py: ########## @@ -0,0 +1,242 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +import uuid +from typing import ( + Any, + Dict, + List, + Optional, + Set, + Union, +) + +import boto3 + +from pyiceberg.catalog import ( + Catalog, + Identifier, + Properties, + PropertiesUpdateSummary, +) +from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError, TableAlreadyExistsError +from pyiceberg.io import FileIO, load_file_io +from pyiceberg.schema import Schema +from pyiceberg.serializers import FromInputFile, ToOutputFile +from pyiceberg.table import Table +from pyiceberg.table.metadata import TableMetadata, new_table_metadata +from pyiceberg.table.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder +from pyiceberg.typedef import EMPTY_DICT + +ICEBERG = "ICEBERG" +EXTERNAL_TABLE_TYPE = "EXTERNAL_TABLE" + +PROP_TABLE_TYPE = "table_type" +PROP_WAREHOUSE = "warehouse" +PROP_METADATA_LOCATION = "metadata_location" + +PROP_GLUE_TABLE = "Table" +Prop_GLUE_TABLE_TYPE = "TableType" +PROP_GLUE_TABLE_DESCRIPTION = "description" +PROP_GLUE_TABLE_PARAMETERS = "Parameters" +PROP_GLUE_TABLE_DATABASE_NAME = "DatabaseName" +PROP_GLUE_TABLE_NAME = "Name" + +PROP_GLUE_DATABASE = "Database" +PROP_GLUE_DATABASE_LOCATION = "LocationUri" + + +def _construct_parameters(metadata_location: str) -> Dict[str, str]: + properties = {PROP_TABLE_TYPE: ICEBERG, PROP_METADATA_LOCATION: metadata_location} Review Comment: I would collapse this into a single line and return right away ########## python/pyproject.toml: ########## @@ -63,6 +63,7 @@ python-snappy = { version = "^0.6.1", optional = true } thrift = { version = "^0.16.0", optional = true } s3fs = { version = "2022.8.2", optional = true } +boto3 = "1.24.59" Review Comment: Could you make this one optional? Similar to Hive. Because not everyone uses Glue or Boto, this makes the package quite heavy to install. ########## python/pyiceberg/catalog/glue.py: ########## @@ -0,0 +1,242 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +import uuid +from typing import ( + Any, + Dict, + List, + Optional, + Set, + Union, +) + +import boto3 + +from pyiceberg.catalog import ( + Catalog, + Identifier, + Properties, + PropertiesUpdateSummary, +) +from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError, TableAlreadyExistsError +from pyiceberg.io import FileIO, load_file_io +from pyiceberg.schema import Schema +from pyiceberg.serializers import FromInputFile, ToOutputFile +from pyiceberg.table import Table +from pyiceberg.table.metadata import TableMetadata, new_table_metadata +from pyiceberg.table.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder +from pyiceberg.typedef import EMPTY_DICT + +ICEBERG = "ICEBERG" +EXTERNAL_TABLE_TYPE = "EXTERNAL_TABLE" + +PROP_TABLE_TYPE = "table_type" +PROP_WAREHOUSE = "warehouse" +PROP_METADATA_LOCATION = "metadata_location" + +PROP_GLUE_TABLE = "Table" +Prop_GLUE_TABLE_TYPE = "TableType" +PROP_GLUE_TABLE_DESCRIPTION = "description" +PROP_GLUE_TABLE_PARAMETERS = "Parameters" +PROP_GLUE_TABLE_DATABASE_NAME = "DatabaseName" +PROP_GLUE_TABLE_NAME = "Name" + +PROP_GLUE_DATABASE = "Database" +PROP_GLUE_DATABASE_LOCATION = "LocationUri" + + +def _construct_parameters(metadata_location: str) -> Dict[str, str]: Review Comment: ```suggestion def _construct_parameters(metadata_location: str) -> Properties: ``` ########## python/pyiceberg/catalog/glue.py: ########## @@ -0,0 +1,242 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +import uuid +from typing import ( + Any, + Dict, + List, + Optional, + Set, + Union, +) + +import boto3 + +from pyiceberg.catalog import ( + Catalog, + Identifier, + Properties, + PropertiesUpdateSummary, +) +from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError, TableAlreadyExistsError +from pyiceberg.io import FileIO, load_file_io +from pyiceberg.schema import Schema +from pyiceberg.serializers import FromInputFile, ToOutputFile +from pyiceberg.table import Table +from pyiceberg.table.metadata import TableMetadata, new_table_metadata +from pyiceberg.table.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder +from pyiceberg.typedef import EMPTY_DICT + +ICEBERG = "ICEBERG" +EXTERNAL_TABLE_TYPE = "EXTERNAL_TABLE" + +PROP_TABLE_TYPE = "table_type" +PROP_WAREHOUSE = "warehouse" +PROP_METADATA_LOCATION = "metadata_location" + +PROP_GLUE_TABLE = "Table" +Prop_GLUE_TABLE_TYPE = "TableType" Review Comment: ```suggestion PROP_GLUE_TABLE_TYPE = "TableType" ``` ########## python/pyiceberg/catalog/glue.py: ########## @@ -0,0 +1,242 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +import uuid +from typing import ( + Any, + Dict, + List, + Optional, + Set, + Union, +) + +import boto3 + +from pyiceberg.catalog import ( + Catalog, + Identifier, + Properties, + PropertiesUpdateSummary, +) +from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError, TableAlreadyExistsError +from pyiceberg.io import FileIO, load_file_io +from pyiceberg.schema import Schema +from pyiceberg.serializers import FromInputFile, ToOutputFile +from pyiceberg.table import Table +from pyiceberg.table.metadata import TableMetadata, new_table_metadata +from pyiceberg.table.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder +from pyiceberg.typedef import EMPTY_DICT + +ICEBERG = "ICEBERG" +EXTERNAL_TABLE_TYPE = "EXTERNAL_TABLE" + +PROP_TABLE_TYPE = "table_type" +PROP_WAREHOUSE = "warehouse" +PROP_METADATA_LOCATION = "metadata_location" + +PROP_GLUE_TABLE = "Table" +Prop_GLUE_TABLE_TYPE = "TableType" +PROP_GLUE_TABLE_DESCRIPTION = "description" +PROP_GLUE_TABLE_PARAMETERS = "Parameters" +PROP_GLUE_TABLE_DATABASE_NAME = "DatabaseName" +PROP_GLUE_TABLE_NAME = "Name" + +PROP_GLUE_DATABASE = "Database" +PROP_GLUE_DATABASE_LOCATION = "LocationUri" + + +def _construct_parameters(metadata_location: str) -> Dict[str, str]: + properties = {PROP_TABLE_TYPE: ICEBERG, PROP_METADATA_LOCATION: metadata_location} + return properties + + +def _construct_table_input(table_name: str, metadata_location: str, properties: Dict[str, str]) -> Dict[str, Any]: Review Comment: ```suggestion def _construct_table_input(table_name: str, metadata_location: str, properties: Properties) -> Properties: ``` ########## python/pyiceberg/catalog/glue.py: ########## @@ -0,0 +1,242 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +import uuid +from typing import ( + Any, + Dict, + List, + Optional, + Set, + Union, +) + +import boto3 + +from pyiceberg.catalog import ( + Catalog, + Identifier, + Properties, + PropertiesUpdateSummary, +) +from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError, TableAlreadyExistsError +from pyiceberg.io import FileIO, load_file_io +from pyiceberg.schema import Schema +from pyiceberg.serializers import FromInputFile, ToOutputFile +from pyiceberg.table import Table +from pyiceberg.table.metadata import TableMetadata, new_table_metadata +from pyiceberg.table.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder +from pyiceberg.typedef import EMPTY_DICT + +ICEBERG = "ICEBERG" +EXTERNAL_TABLE_TYPE = "EXTERNAL_TABLE" + +PROP_TABLE_TYPE = "table_type" +PROP_WAREHOUSE = "warehouse" +PROP_METADATA_LOCATION = "metadata_location" + +PROP_GLUE_TABLE = "Table" +Prop_GLUE_TABLE_TYPE = "TableType" +PROP_GLUE_TABLE_DESCRIPTION = "description" +PROP_GLUE_TABLE_PARAMETERS = "Parameters" +PROP_GLUE_TABLE_DATABASE_NAME = "DatabaseName" +PROP_GLUE_TABLE_NAME = "Name" + +PROP_GLUE_DATABASE = "Database" +PROP_GLUE_DATABASE_LOCATION = "LocationUri" + + +def _construct_parameters(metadata_location: str) -> Dict[str, str]: + properties = {PROP_TABLE_TYPE: ICEBERG, PROP_METADATA_LOCATION: metadata_location} + return properties + + +def _construct_table_input(table_name: str, metadata_location: str, properties: Dict[str, str]) -> Dict[str, Any]: + table_input = { + PROP_GLUE_TABLE_NAME: table_name, + Prop_GLUE_TABLE_TYPE: EXTERNAL_TABLE_TYPE, + PROP_GLUE_TABLE_PARAMETERS: _construct_parameters(metadata_location), + } + + if properties and PROP_GLUE_TABLE_DESCRIPTION in properties: + table_input[PROP_GLUE_TABLE_DESCRIPTION] = properties[PROP_GLUE_TABLE_DESCRIPTION] + + return table_input + + +def _convert_glue_to_iceberg(glue_table, io: FileIO) -> Table: + properties: Dict[str, str] = glue_table[PROP_GLUE_TABLE_PARAMETERS] Review Comment: ```suggestion properties: Properties = glue_table[PROP_GLUE_TABLE_PARAMETERS] ``` ########## python/pyiceberg/catalog/glue.py: ########## @@ -0,0 +1,242 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +import uuid +from typing import ( + Any, + Dict, + List, + Optional, + Set, + Union, +) + +import boto3 + +from pyiceberg.catalog import ( + Catalog, + Identifier, + Properties, + PropertiesUpdateSummary, +) +from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError, TableAlreadyExistsError +from pyiceberg.io import FileIO, load_file_io +from pyiceberg.schema import Schema +from pyiceberg.serializers import FromInputFile, ToOutputFile +from pyiceberg.table import Table +from pyiceberg.table.metadata import TableMetadata, new_table_metadata +from pyiceberg.table.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder +from pyiceberg.typedef import EMPTY_DICT + +ICEBERG = "ICEBERG" +EXTERNAL_TABLE_TYPE = "EXTERNAL_TABLE" + +PROP_TABLE_TYPE = "table_type" +PROP_WAREHOUSE = "warehouse" +PROP_METADATA_LOCATION = "metadata_location" + +PROP_GLUE_TABLE = "Table" +Prop_GLUE_TABLE_TYPE = "TableType" +PROP_GLUE_TABLE_DESCRIPTION = "description" +PROP_GLUE_TABLE_PARAMETERS = "Parameters" +PROP_GLUE_TABLE_DATABASE_NAME = "DatabaseName" +PROP_GLUE_TABLE_NAME = "Name" + +PROP_GLUE_DATABASE = "Database" +PROP_GLUE_DATABASE_LOCATION = "LocationUri" + + +def _construct_parameters(metadata_location: str) -> Dict[str, str]: + properties = {PROP_TABLE_TYPE: ICEBERG, PROP_METADATA_LOCATION: metadata_location} + return properties + + +def _construct_table_input(table_name: str, metadata_location: str, properties: Dict[str, str]) -> Dict[str, Any]: + table_input = { + PROP_GLUE_TABLE_NAME: table_name, + Prop_GLUE_TABLE_TYPE: EXTERNAL_TABLE_TYPE, + PROP_GLUE_TABLE_PARAMETERS: _construct_parameters(metadata_location), + } + + if properties and PROP_GLUE_TABLE_DESCRIPTION in properties: Review Comment: You could also leverage the walrus operator here: ```suggestion if table_description := properties.get(PROP_GLUE_TABLE_DESCRIPTION): ``` ########## python/pyiceberg/catalog/glue.py: ########## @@ -0,0 +1,242 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +import uuid +from typing import ( + Any, + Dict, + List, + Optional, + Set, + Union, +) + +import boto3 + +from pyiceberg.catalog import ( + Catalog, + Identifier, + Properties, + PropertiesUpdateSummary, +) +from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError, TableAlreadyExistsError +from pyiceberg.io import FileIO, load_file_io +from pyiceberg.schema import Schema +from pyiceberg.serializers import FromInputFile, ToOutputFile +from pyiceberg.table import Table +from pyiceberg.table.metadata import TableMetadata, new_table_metadata +from pyiceberg.table.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder +from pyiceberg.typedef import EMPTY_DICT + +ICEBERG = "ICEBERG" +EXTERNAL_TABLE_TYPE = "EXTERNAL_TABLE" + +PROP_TABLE_TYPE = "table_type" +PROP_WAREHOUSE = "warehouse" +PROP_METADATA_LOCATION = "metadata_location" + +PROP_GLUE_TABLE = "Table" +Prop_GLUE_TABLE_TYPE = "TableType" +PROP_GLUE_TABLE_DESCRIPTION = "description" +PROP_GLUE_TABLE_PARAMETERS = "Parameters" +PROP_GLUE_TABLE_DATABASE_NAME = "DatabaseName" +PROP_GLUE_TABLE_NAME = "Name" + +PROP_GLUE_DATABASE = "Database" +PROP_GLUE_DATABASE_LOCATION = "LocationUri" + + +def _construct_parameters(metadata_location: str) -> Dict[str, str]: + properties = {PROP_TABLE_TYPE: ICEBERG, PROP_METADATA_LOCATION: metadata_location} + return properties + + +def _construct_table_input(table_name: str, metadata_location: str, properties: Dict[str, str]) -> Dict[str, Any]: + table_input = { + PROP_GLUE_TABLE_NAME: table_name, + Prop_GLUE_TABLE_TYPE: EXTERNAL_TABLE_TYPE, + PROP_GLUE_TABLE_PARAMETERS: _construct_parameters(metadata_location), + } + + if properties and PROP_GLUE_TABLE_DESCRIPTION in properties: + table_input[PROP_GLUE_TABLE_DESCRIPTION] = properties[PROP_GLUE_TABLE_DESCRIPTION] + + return table_input + + +def _convert_glue_to_iceberg(glue_table, io: FileIO) -> Table: + properties: Dict[str, str] = glue_table[PROP_GLUE_TABLE_PARAMETERS] + + if PROP_TABLE_TYPE not in properties: + raise NoSuchTableError( + f"Property table_type missing, could not determine type: " + f"{glue_table[PROP_GLUE_TABLE_DATABASE_NAME]}.{glue_table[PROP_GLUE_TABLE_NAME]}" + ) + glue_table_type = properties.get(PROP_TABLE_TYPE) + if glue_table_type != ICEBERG: + raise NoSuchTableError( + f"Property table_type is {glue_table_type}, expected {ICEBERG}: " + f"{glue_table[PROP_GLUE_TABLE_DATABASE_NAME]}.{glue_table[PROP_GLUE_TABLE_NAME]}" + ) + if prop_meta_location := properties.get(PROP_METADATA_LOCATION): + metadata_location = prop_meta_location + else: + raise NoSuchTableError(f"Table property {PROP_METADATA_LOCATION} is missing") + + file = io.new_input(metadata_location) + metadata = FromInputFile.table_metadata(file) + return Table( + identifier=(glue_table[PROP_GLUE_TABLE_DATABASE_NAME], glue_table[PROP_GLUE_TABLE_NAME]), + metadata=metadata, + metadata_location=metadata_location, + ) + + +def _write_metadata(metadata: TableMetadata, io: FileIO, metadate_path: str): + ToOutputFile.table_metadata(metadata, io.new_output(metadate_path)) + + +class GlueCatalog(Catalog): + def __init__(self, name: str, **properties: str): + super().__init__(name, **properties) + self.glue = boto3.client("glue") + + def _default_warehouse_location(self, database_name: str, table_name: str): + try: + response = self.glue.get_database(Name=database_name) + except self.glue.exceptions.EntityNotFoundException as e: + raise NoSuchNamespaceError(f"The database: {database_name} does not exist") from e + + if PROP_GLUE_DATABASE_LOCATION in response[PROP_GLUE_DATABASE]: + return f"{response[PROP_GLUE_DATABASE][PROP_GLUE_DATABASE]}/{table_name}" + + if PROP_WAREHOUSE in self.properties: + return f"{self.properties[PROP_WAREHOUSE]}/{database_name}.db/{table_name}" + + raise ValueError("No default path is set, please specify a location when creating a table") + + def _resolve_table_location(self, location: Optional[str], database_name: str, table_name: str) -> str: + if not location: + return self._default_warehouse_location(database_name, table_name) + return location + + def create_table( + self, + identifier: Union[str, Identifier], + schema: Schema, + location: Optional[str] = None, + partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, + sort_order: SortOrder = UNSORTED_SORT_ORDER, + properties: Properties = EMPTY_DICT, + ) -> Table: + """Create an Iceberg table in Glue catalog + + Args: + identifier: Table identifier. + schema: Table's schema. + location: Location for the table. Optional Argument. + partition_spec: PartitionSpec for the table. + sort_order: SortOrder for the table. + properties: Table properties that can be a string based dictionary. + + Returns: + Table: the created table instance + + Raises: + AlreadyExistsError: If a table with the name already exists + ValueError: If the identifier is invalid + """ + database_name, table_name = self.identifier_to_tuple(identifier) + + location = self._resolve_table_location(location, database_name, table_name) + metadata_location = f"{location}/metadata/00000-{uuid.uuid4()}.metadata.json" + metadata = new_table_metadata( + location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties + ) + io = load_file_io({**self.properties, **properties}, location=location) + _write_metadata(metadata, io, metadata_location) + try: + self.glue.create_table( + DatabaseName=database_name, TableInput=_construct_table_input(table_name, metadata_location, properties) + ) + except self.glue.exceptions.AlreadyExistsException as e: + raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e + except self.glue.exceptions.EntityNotFoundException as e: + raise NoSuchNamespaceError(f"Database {database_name} not found") from e + + try: + load_table_response = self.glue.get_table(DatabaseName=database_name, Name=table_name) + except self.glue.exceptions.EntityNotFoundException as e: + raise NoSuchTableError(f"Table {database_name}.{table_name} fail to be created") from e + + glue_table = load_table_response[PROP_GLUE_TABLE] + return _convert_glue_to_iceberg(glue_table, io) + + def load_table(self, identifier: Union[str, Identifier]) -> Table: + """Loads the table's metadata and returns the table instance. + + You can also use this method to check for table existence using 'try catalog.table() except TableNotFoundError' + Note: This method doesn't scan data stored in the table. + + Args: + identifier: Table identifier. + + Returns: + Table: the table instance with its metadata + + Raises: + NoSuchTableError: If a table with the name does not exist, or the identifier is invalid + """ + database_name, table_name = self.identifier_to_tuple(identifier) + try: + load_table_response = self.glue.get_table(DatabaseName=database_name, Name=table_name) + except self.glue.exceptions.EntityNotFoundException as e: + raise NoSuchTableError(f"Table does not exists: {table_name}") from e + loaded_table = load_table_response[PROP_GLUE_TABLE] + io = load_file_io(self.properties, loaded_table[PROP_GLUE_TABLE_PARAMETERS][PROP_METADATA_LOCATION]) + return _convert_glue_to_iceberg(loaded_table, io) + + def drop_table(self, identifier: Union[str, Identifier]) -> None: + raise NotImplementedError("currently unsupported") + + def purge_table(self, identifier: Union[str, Identifier]) -> None: + self.drop_table(identifier) + + def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: + raise NotImplementedError("currently unsupported") + + def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: + raise NotImplementedError("currently unsupported") + + def drop_namespace(self, namespace: Union[str, Identifier]) -> None: + raise NotImplementedError("currently unsupported") + + def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: + raise NotImplementedError("currently unsupported") + + def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: + raise NotImplementedError("currently unsupported") Review Comment: Can you implement this one? This makes it easier to test. Should be something like `self.glue.get_databases()` ########## python/tests/catalog/test_glue.py: ########## @@ -0,0 +1,77 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import getpass as gt +import random +import string + +import pytest + +from pyiceberg.catalog.glue import GlueCatalog +from pyiceberg.exceptions import NoSuchNamespaceError +from pyiceberg.schema import Schema Review Comment: For testing we could also use moto: https://docs.getmoto.org/en/latest/docs/services/glue.html What do you think @jackye1995 ? ########## python/pyiceberg/catalog/glue.py: ########## @@ -0,0 +1,242 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +import uuid +from typing import ( + Any, + Dict, + List, + Optional, + Set, + Union, +) + +import boto3 + +from pyiceberg.catalog import ( + Catalog, + Identifier, + Properties, + PropertiesUpdateSummary, +) +from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError, TableAlreadyExistsError +from pyiceberg.io import FileIO, load_file_io +from pyiceberg.schema import Schema +from pyiceberg.serializers import FromInputFile, ToOutputFile +from pyiceberg.table import Table +from pyiceberg.table.metadata import TableMetadata, new_table_metadata +from pyiceberg.table.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder +from pyiceberg.typedef import EMPTY_DICT + +ICEBERG = "ICEBERG" +EXTERNAL_TABLE_TYPE = "EXTERNAL_TABLE" + +PROP_TABLE_TYPE = "table_type" +PROP_WAREHOUSE = "warehouse" +PROP_METADATA_LOCATION = "metadata_location" + +PROP_GLUE_TABLE = "Table" +Prop_GLUE_TABLE_TYPE = "TableType" +PROP_GLUE_TABLE_DESCRIPTION = "description" +PROP_GLUE_TABLE_PARAMETERS = "Parameters" +PROP_GLUE_TABLE_DATABASE_NAME = "DatabaseName" +PROP_GLUE_TABLE_NAME = "Name" + +PROP_GLUE_DATABASE = "Database" +PROP_GLUE_DATABASE_LOCATION = "LocationUri" + + +def _construct_parameters(metadata_location: str) -> Dict[str, str]: + properties = {PROP_TABLE_TYPE: ICEBERG, PROP_METADATA_LOCATION: metadata_location} + return properties + + +def _construct_table_input(table_name: str, metadata_location: str, properties: Dict[str, str]) -> Dict[str, Any]: + table_input = { + PROP_GLUE_TABLE_NAME: table_name, + Prop_GLUE_TABLE_TYPE: EXTERNAL_TABLE_TYPE, + PROP_GLUE_TABLE_PARAMETERS: _construct_parameters(metadata_location), + } + + if properties and PROP_GLUE_TABLE_DESCRIPTION in properties: + table_input[PROP_GLUE_TABLE_DESCRIPTION] = properties[PROP_GLUE_TABLE_DESCRIPTION] + + return table_input + + +def _convert_glue_to_iceberg(glue_table, io: FileIO) -> Table: Review Comment: ```suggestion def _convert_glue_to_iceberg(glue_table: Dict[str, Any], io: FileIO) -> Table: ``` ########## python/pyiceberg/catalog/glue.py: ########## @@ -0,0 +1,202 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +import getpass +import uuid + +import boto3 +from datetime import datetime +from typing import Union, Optional, List, Set, Dict + +from pyiceberg.catalog.hive import OWNER + +from pyiceberg.exceptions import ( + NamespaceAlreadyExistsError, + NamespaceNotEmptyError, + NoSuchNamespaceError, + NoSuchTableError, + TableAlreadyExistsError, +) + +from pyiceberg.catalog import ( + Catalog, + Identifier, + Properties, + PropertiesUpdateSummary, +) +from pyiceberg.io import FileIO, load_file_io +from pyiceberg.schema import Schema +from pyiceberg.serializers import FromInputFile, ToOutputFile +from pyiceberg.table import Table +from pyiceberg.table.metadata import TableMetadata, new_table_metadata +from pyiceberg.table.partitioning import PartitionSpec, UNPARTITIONED_PARTITION_SPEC +from pyiceberg.table.sorting import SortOrder, UNSORTED_SORT_ORDER, SortDirection +from pyiceberg.typedef import EMPTY_DICT + +from pyiceberg.types import NestedField + +METADATA_LOCATION = "metadata_location" +ICEBERG = "iceberg" + + +class GlueCatalog(Catalog): + + def __init__(self, name: str, **properties: Properties): + super().__init__(name, **properties) + self.client = boto3.client("glue") + self.sts_client = boto3.client("sts") + + + def _check_response(self, response: Dict[str, Dict[str, str]]): + if response['ResponseMetadata']['HTTPStatusCode'] != 200: + raise ValueError(f"Got unexpected status code {response['HttpStatusCode']}") + + def _glue_to_iceberg(self, glue_table, io: FileIO) -> Table: + properties: Dict[str, str] = glue_table["Parameters"] + + if "table_type" not in properties: + raise NoSuchTableError( + f"Property table_type missing, could not determine type: {glue_table['DatabaseName']}.{glue_table['Name']}") + glue_table_type = properties.get("table_type") + if glue_table_type.lower() != ICEBERG: + raise NoSuchTableError( + f"Property table_type is {glue_table_type}, expected {ICEBERG}: {glue_table['DatabaseName']}.{glue_table['Name']}") + if prop_meta_location := properties.get(METADATA_LOCATION): + metadata_location = prop_meta_location + else: + raise NoSuchTableError(f"Table property {METADATA_LOCATION} is missing") + + file = io.new_input(metadata_location) + metadata = FromInputFile.table_metadata(file) + return Table( + identifier=(glue_table['DatabaseName'], glue_table['Name']), + metadata=metadata, + metadata_location=metadata_location + ) + + def _iceberg_to_glue(self, iceberg_table): + # TODO + pass + + def _construct_parameters(self, metadata_location: str) -> Dict[str, str]: + properties = {"table_type": "ICEBERG", "metadata_location": metadata_location} + return properties + + def _default_warehouse_location(self, database_name: str, table_name: str): + try: + response = self.client.get_database(Name=database_name) + # TODO: handle response and errors + except: + raise NoSuchNamespaceError("Database not found") + + if "LocationUri" in response["Database"]: + return f"{response['Database']['LocationUri']}/table_name" + + # TODO: should extract warehouse path from the properties and handle potential errors + return f"{self.properties['warehouse_path']}/{database_name}.db/{table_name}" + + def _resolve_table_location(self, location: Optional[str], database_name: str, table_name: str): + if not location: + return self._default_warehouse_location(database_name, table_name) + return location + + def _write_metadata(self, metadata: TableMetadata, io: FileIO, metadate_path: str): + ToOutputFile.table_metadata(metadata, io.new_output(metadate_path)) + + # tested on pre-existing database + def create_table( + self, + identifier: Union[str, Identifier], + schema: Schema, + location: Optional[str] = None, + partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, + sort_order: SortOrder = UNSORTED_SORT_ORDER, + properties: Properties = EMPTY_DICT, + ) -> Table: + database_name, table_name = self.identifier_to_tuple(identifier) + + location = self._resolve_table_location(location, database_name, table_name) + # TODO: give it correct path based on java version of glueCatalog + metadata_location = f"{location}/metadata/{uuid.uuid4()}.metadata.json" + metadata = new_table_metadata( + location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, + properties=properties + ) + io = load_file_io({**self.properties, **properties}, location=location) + self._write_metadata(metadata, io, metadata_location) + try: + create_table_response = self.client.create_table( + DatabaseName=database_name, + TableInput={ + 'Name': table_name, + 'Description': '', # To be fixed + 'TableType': 'EXTERNAL_TABLE', + 'Parameters': self._construct_parameters(metadata_location), + } + ) + # TODO: check response + load_table_response = self.client.get_table(DatabaseName=database_name, Name=table_name) + glue_table = load_table_response['Table'] + except self.client.exceptions.AlreadyExistsException as e: + raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e + + return self._glue_to_iceberg(glue_table, io) + + # tested + def load_table(self, identifier: Union[str, Identifier]) -> Table: + database_name, table_name = self.identifier_to_tuple(identifier) + try: + load_table_response = self.client.get_table(DatabaseName=database_name, Name=table_name) + self._check_response(load_table_response) + except self.client.exceptions.EntityNotFoundException as e: + raise NoSuchTableError(f"Table does not exists: {table_name}") from e + # TODO: may need to add table properties to the io too + io = load_file_io( Review Comment: The most important ones are properties and location. I guess that we could also infer the location from the properties, but I don't think that's the cleanest approach, and we should just get it from the location property. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
