rdblue commented on code in PR #5391: URL: https://github.com/apache/iceberg/pull/5391#discussion_r934033823
########## python/pyiceberg/catalog/hive.py: ########## @@ -0,0 +1,345 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from getpass import getuser +from time import time +from typing import ( + Any, + Dict, + List, + Optional, + Set, + Tuple, + Union, +) +from urllib.parse import urlparse + +from hive_metastore.ThriftHiveMetastore import Client +from hive_metastore.ttypes import AlreadyExistsException +from hive_metastore.ttypes import Database as HiveDatabase +from hive_metastore.ttypes import ( + FieldSchema, + InvalidOperationException, + MetaException, + NoSuchObjectException, + SerDeInfo, + StorageDescriptor, +) +from hive_metastore.ttypes import Table as HiveTable +from thrift.protocol import TBinaryProtocol +from thrift.transport import TSocket, TTransport + +from pyiceberg.catalog import Identifier, Properties +from pyiceberg.catalog.base import Catalog, PropertiesUpdateSummary +from pyiceberg.exceptions import ( + AlreadyExistsError, + NamespaceNotEmptyError, + NoSuchNamespaceError, + NoSuchTableError, +) +from pyiceberg.schema import Schema +from pyiceberg.table.base import Table +from pyiceberg.table.partitioning import PartitionSpec +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder +from pyiceberg.types import ( + BinaryType, + BooleanType, + DateType, + DecimalType, + DoubleType, + FixedType, + FloatType, + IcebergType, + IntegerType, + ListType, + LongType, + MapType, + StringType, + StructType, + TimestampType, + TimeType, + UUIDType, +) + +# Replace by visitor +hive_types = { + BooleanType: "boolean", + IntegerType: "int", + LongType: "bigint", + FloatType: "float", + DoubleType: "double", + DateType: "date", + TimeType: "string", + TimestampType: "timestamp", + StringType: "string", + UUIDType: "string", + BinaryType: "binary", + FixedType: "binary", + DecimalType: None, + StructType: None, + ListType: None, + MapType: None, +} + + +class _HiveClient: + """Helper class to nicely open and close the transport""" + + _transport: TTransport + _client: Client + + def __init__(self, uri: str): + url_parts = urlparse(uri) + transport = TSocket.TSocket(url_parts.hostname, url_parts.port) + self._transport = TTransport.TBufferedTransport(transport) + protocol = TBinaryProtocol.TBinaryProtocol(transport) + + self._client = Client(protocol) + + def __enter__(self) -> Client: + self._transport.open() + return self._client + + def __exit__(self, exc_type, exc_val, exc_tb): + self._transport.close() + + +def _construct_hive_storage_descriptor(schema: Schema, location: Optional[str]) -> StorageDescriptor: + ser_de_info = SerDeInfo(serializationLib="org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") + return StorageDescriptor( + _convert_schema_to_columns(schema), + location, + "org.apache.hadoop.mapred.FileInputFormat", + "org.apache.hadoop.mapred.FileOutputFormat", + serdeInfo=ser_de_info, + ) + + +def _convert_schema_to_columns(schema: Schema) -> List[FieldSchema]: + return [FieldSchema(field.name, _iceberg_type_to_hive_types(field.field_type), field.doc) for field in schema.fields] + + +def _iceberg_type_to_hive_types(col_type: IcebergType) -> str: + if hive_type := hive_types.get(type(col_type)): + return hive_type + raise NotImplementedError(f"Not yet implemented column type {col_type}") + + +PROP_EXTERNAL = "EXTERNAL" +PROP_TABLE_TYPE = "table_type" +PROP_METADATA_LOCATION = "metadata_location" +PROP_PREVIOUS_METADATA_LOCATION = "previous_metadata_location" + + +def _construct_parameters(metadata_location: str, previous_metadata_location: Optional[str] = None) -> Dict[str, Any]: + properties = {PROP_EXTERNAL: "TRUE", PROP_TABLE_TYPE: "ICEBERG", PROP_METADATA_LOCATION: metadata_location} + if previous_metadata_location: + properties[previous_metadata_location] = previous_metadata_location + + return properties + + +def _annotate_namespace(database: HiveDatabase, properties: Properties) -> HiveDatabase: + params = {} + for key, value in properties.items(): + if key == "comment": + database.description = value + elif key == "comment": + database.description = value + else: + params[key] = value + database.parameters = params + return database + + +class HiveCatalog(Catalog): + _client: _HiveClient + + @staticmethod + def identifier_to_database(identifier: Union[str, Identifier]) -> str: + tuple_identifier = Catalog.identifier_to_tuple(identifier) + if len(tuple_identifier) != 1: + raise ValueError("We expect a database name, Hive does not support hierarchical namespaces") + + return tuple_identifier[0] + + @staticmethod + def identifier_to_database_table(identifier: Union[str, Identifier]) -> Tuple[str, str]: + tuple_identifier = Catalog.identifier_to_tuple(identifier) + if len(tuple_identifier) != 2: + raise ValueError("We expect a database.table, Hive does not support hierarchical namespaces") Review Comment: Similar update needed here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org